You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/03/01 00:46:45 UTC
[kafka] branch 2.4 updated: MINOR: add wait_for_assigned_partitions
to console-consumer (#8192)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new bd4feef MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
bd4feef is described below
commit bd4feef3dd66e4908d901be772344cb64c333087
Author: Brian Bushree <bb...@confluent.io>
AuthorDate: Sat Feb 29 16:43:51 2020 -0800
MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
what/why
the throttling_test was broken by this PR (#7785) since it depends on the consumer having partitions-assigned before starting the producer
this PR provides the ability to wait for partitions to be assigned in the console consumer before considering it started.
caveat
this does not support starting up the JmxTool inside the console-consumer for custom metrics while using this wait_until_partitions_assigned flag since the code assumes one JmxTool running per node.
I think a proper fix for this would be to make JmxTool its own standalone single-node service
alternatives
we could use the EndToEnd test suite which uses the verifiable producer/consumer under the hood but I found that there were more changes necessary to get this working unfortunately (specifically doesn't seem like this test suite plays nicely with the ProducerPerformanceService)
Reviewers: Mathew Wong <mw...@confluent.io>, Bill Bejeck <bbejeck.com>
---
tests/kafkatest/services/console_consumer.py | 28 ++++++++++++++++++++--
tests/kafkatest/services/monitor/jmx.py | 19 +++++++++++++--
.../tests/core/fetch_from_follower_test.py | 16 +------------
tests/kafkatest/tests/core/throttling_test.py | 3 ++-
4 files changed, 46 insertions(+), 20 deletions(-)
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5fd4712..e3d1910 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,9 +17,10 @@ import itertools
import os
from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
"""
@@ -62,7 +63,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False,
isolation_level="read_uncommitted", jaas_override_variables=None,
- kafka_opts_override="", client_prop_file_override="", consumer_properties={}):
+ kafka_opts_override="", client_prop_file_override="", consumer_properties={},
+ wait_until_partitions_assigned=False):
"""
Args:
context: standard context
@@ -122,6 +124,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
self.kafka_opts_override = kafka_opts_override
self.client_prop_file_override = client_prop_file_override
self.consumer_properties = consumer_properties
+ self.wait_until_partitions_assigned = wait_until_partitions_assigned
def prop_file(self, node):
@@ -268,8 +271,29 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
with self.lock:
self.read_jmx_output(idx, node)
+ def _wait_until_partitions_assigned(self, node, timeout_sec=60):
+ if self.jmx_object_names is not None:
+ raise Exception("'wait_until_partitions_assigned' is not supported while using 'jmx_object_names'/'jmx_attributes'")
+ jmx_tool = JmxTool(self.context, jmx_poll_ms=100)
+ jmx_tool.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
+ jmx_tool.jmx_attributes = ["assigned-partitions"]
+ jmx_tool.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id
+ jmx_tool.start_jmx_tool(self.idx(node), node)
+ assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id
+
+ def read_and_check():
+ jmx_tool.read_jmx_output(self.idx(node), node)
+ return assigned_partitions_jmx_attr in jmx_tool.maximum_jmx_value
+
+ wait_until(lambda: read_and_check(),
+ timeout_sec=timeout_sec,
+ backoff_sec=.5,
+ err_msg="consumer was not assigned partitions within %d seconds" % timeout_sec)
+
def start_node(self, node):
BackgroundThreadService.start_node(self, node)
+ if self.wait_until_partitions_assigned:
+ self._wait_until_partitions_assigned(node)
def stop_node(self, node):
self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account)))
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index c5b747d..2dcd369 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -17,6 +17,8 @@ import os
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
class JmxMixin(object):
@@ -41,10 +43,11 @@ class JmxMixin(object):
self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")
- def clean_node(self, node):
+ def clean_node(self, node, idx=None):
node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False,
allow_fail=True)
- idx = self.idx(node)
+ if idx is None:
+ idx = self.idx(node)
self.started[idx-1] = False
node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False)
@@ -139,3 +142,15 @@ class JmxMixin(object):
def jmx_class_name(self):
return "kafka.tools.JmxTool"
+
+class JmxTool(JmxMixin, KafkaPathResolverMixin):
+ """
+ Simple helper class for using the JmxTool directly instead of as a mix-in
+ """
+ def __init__(self, text_context, *args, **kwargs):
+ JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
+ self.context = text_context
+
+ @property
+ def logger(self):
+ return self.context.logger
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index fde1baf..ef37728 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -18,29 +18,15 @@ from collections import defaultdict
from ducktape.mark.resource import cluster
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
-class JmxTool(JmxMixin, KafkaPathResolverMixin):
- """
- Simple helper class for using the JmxTool directly instead of as a mix-in
- """
- def __init__(self, text_context, *args, **kwargs):
- JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
- self.context = text_context
-
- @property
- def logger(self):
- return self.context.logger
-
-
class FetchFromFollowerTest(ProduceConsumeValidateTest):
RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector"
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 4a8327e..f29ec2b 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -165,7 +165,8 @@ class ThrottlingTest(ProduceConsumeValidateTest):
self.topic,
consumer_timeout_ms=60000,
message_validator=is_int,
- from_beginning=False)
+ from_beginning=False,
+ wait_until_partitions_assigned=True)
self.kafka.start()
bulk_producer.run()