You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/03/01 00:46:45 UTC

[kafka] branch 2.4 updated: MINOR: add wait_for_assigned_partitions to console-consumer (#8192)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new bd4feef  MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
bd4feef is described below

commit bd4feef3dd66e4908d901be772344cb64c333087
Author: Brian Bushree <bb...@confluent.io>
AuthorDate: Sat Feb 29 16:43:51 2020 -0800

    MINOR: add wait_for_assigned_partitions to console-consumer (#8192)
    
    what/why
    the throttling_test was broken by this PR (#7785) since it depends on the consumer having partitions-assigned before starting the producer
    
    this PR provides the ability to wait for partitions to be assigned in the console consumer before considering it started.
    
    caveat
    this does not support starting up the JmxTool inside the console-consumer for custom metrics while using this wait_until_partitions_assigned flag since the code assumes one JmxTool running per node.
    
    I think a proper fix for this would be to make JmxTool its own standalone single-node service
    
    alternatives
    we could use the EndToEnd test suite which uses the verifiable producer/consumer under the hood but I found that there were more changes necessary to get this working unfortunately (specifically doesn't seem like this test suite plays nicely with the ProducerPerformanceService)
    
    Reviewers: Mathew Wong <mw...@confluent.io>, Bill Bejeck <bbejeck.com>
---
 tests/kafkatest/services/console_consumer.py       | 28 ++++++++++++++++++++--
 tests/kafkatest/services/monitor/jmx.py            | 19 +++++++++++++--
 .../tests/core/fetch_from_follower_test.py         | 16 +------------
 tests/kafkatest/tests/core/throttling_test.py      |  3 ++-
 4 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5fd4712..e3d1910 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,9 +17,10 @@ import itertools
 import os
 
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
 from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
 
 """
@@ -62,7 +63,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
                  enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False,
                  isolation_level="read_uncommitted", jaas_override_variables=None,
-                 kafka_opts_override="", client_prop_file_override="", consumer_properties={}):
+                 kafka_opts_override="", client_prop_file_override="", consumer_properties={},
+                 wait_until_partitions_assigned=False):
         """
         Args:
             context:                    standard context
@@ -122,6 +124,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.kafka_opts_override = kafka_opts_override
         self.client_prop_file_override = client_prop_file_override
         self.consumer_properties = consumer_properties
+        self.wait_until_partitions_assigned = wait_until_partitions_assigned
 
 
     def prop_file(self, node):
@@ -268,8 +271,29 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         with self.lock:
             self.read_jmx_output(idx, node)
 
+    def _wait_until_partitions_assigned(self, node, timeout_sec=60):
+        if self.jmx_object_names is not None:
+            raise Exception("'wait_until_partitions_assigned' is not supported while using 'jmx_object_names'/'jmx_attributes'")
+        jmx_tool = JmxTool(self.context, jmx_poll_ms=100)
+        jmx_tool.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
+        jmx_tool.jmx_attributes = ["assigned-partitions"]
+        jmx_tool.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id
+        jmx_tool.start_jmx_tool(self.idx(node), node)
+        assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id
+
+        def read_and_check():
+            jmx_tool.read_jmx_output(self.idx(node), node)
+            return assigned_partitions_jmx_attr in jmx_tool.maximum_jmx_value
+
+        wait_until(lambda: read_and_check(),
+                timeout_sec=timeout_sec,
+                backoff_sec=.5,
+                err_msg="consumer was not assigned partitions within %d seconds" % timeout_sec)
+
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)
+        if self.wait_until_partitions_assigned:
+            self._wait_until_partitions_assigned(node)
 
     def stop_node(self, node):
         self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account)))
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index c5b747d..2dcd369 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -17,6 +17,8 @@ import os
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
 
 class JmxMixin(object):
@@ -41,10 +43,11 @@ class JmxMixin(object):
         self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
         self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")
 
-    def clean_node(self, node):
+    def clean_node(self, node, idx=None):
         node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False,
                                          allow_fail=True)
-        idx = self.idx(node)
+        if idx is None:
+            idx = self.idx(node)
         self.started[idx-1] = False
         node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False)
 
@@ -139,3 +142,15 @@ class JmxMixin(object):
 
     def jmx_class_name(self):
         return "kafka.tools.JmxTool"
+
+class JmxTool(JmxMixin, KafkaPathResolverMixin):
+    """
+    Simple helper class for using the JmxTool directly instead of as a mix-in
+    """
+    def __init__(self, text_context, *args, **kwargs):
+        JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
+        self.context = text_context
+
+    @property
+    def logger(self):
+        return self.context.logger
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index fde1baf..ef37728 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -18,29 +18,15 @@ from collections import defaultdict
 
 from ducktape.mark.resource import cluster
 
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.monitor.jmx import JmxTool
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 
 
-class JmxTool(JmxMixin, KafkaPathResolverMixin):
-    """
-    Simple helper class for using the JmxTool directly instead of as a mix-in
-    """
-    def __init__(self, text_context, *args, **kwargs):
-        JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
-        self.context = text_context
-
-    @property
-    def logger(self):
-        return self.context.logger
-
-
 class FetchFromFollowerTest(ProduceConsumeValidateTest):
 
     RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector"
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index 4a8327e..f29ec2b 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -165,7 +165,8 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                                         self.topic,
                                         consumer_timeout_ms=60000,
                                         message_validator=is_int,
-                                        from_beginning=False)
+                                        from_beginning=False,
+                                        wait_until_partitions_assigned=True)
 
         self.kafka.start()
         bulk_producer.run()