You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/07/18 04:24:49 UTC

kafka git commit: MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX

Repository: kafka
Updated Branches:
  refs/heads/trunk 28c83d966 -> d663005fd


MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX

Waiting for the first line of output was added in KAFKA-2527 when JmxMixin was originally added as a heuristic to
determine when the process was ready. We've since determined this is not good enough given JmxTool's limitations
and now include a separate, more reliable check before starting JmxTool. This check is also dangerous since a
consumer that is started before data is available in the topic, it won't output anything to stdout and only logs
errors to a separate log file. This means we may have a long delay between starting the process and starting JMX
monitoring.

Since we have a more reliable check for liveness via JMX now (and in cases that need it, partition assignment
metrics via JMX), we should no longer need to wait for the first line of output.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Ismael Juma <is...@juma.me.uk>, Apurva Mehta <ap...@confluent.io>

Closes #3447 from ewencp/dont-wait-first-line-console-consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d663005f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d663005f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d663005f

Branch: refs/heads/trunk
Commit: d663005fddcebafba439473299dc4c6ad74c966c
Parents: 28c83d9
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Jul 17 21:24:45 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jul 17 21:24:45 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py | 40 +++++++++++------------
 1 file changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d663005f/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 6fad674..5a945e1 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -255,27 +255,25 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-        first_line = next(consumer_output, None)
-
-        if first_line is not None:
-            self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
-            self.init_jmx_attributes()
-            self.start_jmx_tool(idx, node)
-
-            for line in itertools.chain([first_line], consumer_output):
-                msg = line.strip()
-                if msg == "shutdown_complete":
-                    # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
-                    if node in self.clean_shutdown_nodes:
-                        raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
-                    self.clean_shutdown_nodes.add(node)
-                else:
-                    if self.message_validator is not None:
-                        msg = self.message_validator(msg)
-                    if msg is not None:
-                        self.messages_consumed[idx].append(msg)
-
-            self.read_jmx_output(idx, node)
+
+        self.init_jmx_attributes()
+        self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+        self.start_jmx_tool(idx, node)
+
+        for line in consumer_output:
+            msg = line.strip()
+            if msg == "shutdown_complete":
+                # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
+                if node in self.clean_shutdown_nodes:
+                    raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
+                self.clean_shutdown_nodes.add(node)
+            else:
+                if self.message_validator is not None:
+                    msg = self.message_validator(msg)
+                if msg is not None:
+                    self.messages_consumed[idx].append(msg)
+
+        self.read_jmx_output(idx, node)
 
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)