You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/07/20 04:41:36 UTC

kafka git commit: Revert "MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX"

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 40e36fccb -> 021960be8


Revert "MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX"

This reverts commit 40e36fccb6d6075e206347489391e46d505d6004.

See https://issues.apache.org/jira/browse/KAFKA-5608 and https://github.com/apache/kafka/pull/3547 for more details.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/021960be
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/021960be
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/021960be

Branch: refs/heads/0.10.2
Commit: 021960be87c3b69bf867e153d58cc7d0e3e989b2
Parents: 40e36fc
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jul 19 21:40:39 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jul 19 21:40:39 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py | 40 ++++++++++++-----------
 1 file changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/021960be/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 53ed357..d55d012 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -250,25 +250,27 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-
-        self.init_jmx_attributes()
-        self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
-        self.start_jmx_tool(idx, node)
-
-        for line in consumer_output:
-            msg = line.strip()
-            if msg == "shutdown_complete":
-                # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
-                if node in self.clean_shutdown_nodes:
-                    raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
-                self.clean_shutdown_nodes.add(node)
-            else:
-                if self.message_validator is not None:
-                    msg = self.message_validator(msg)
-                if msg is not None:
-                    self.messages_consumed[idx].append(msg)
-
-        self.read_jmx_output(idx, node)
+        first_line = next(consumer_output, None)
+
+        if first_line is not None:
+            self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+            self.init_jmx_attributes()
+            self.start_jmx_tool(idx, node)
+
+            for line in itertools.chain([first_line], consumer_output):
+                msg = line.strip()
+                if msg == "shutdown_complete":
+                    # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
+                    if node in self.clean_shutdown_nodes:
+                        raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
+                    self.clean_shutdown_nodes.add(node)
+                else:
+                    if self.message_validator is not None:
+                        msg = self.message_validator(msg)
+                    if msg is not None:
+                        self.messages_consumed[idx].append(msg)
+
+            self.read_jmx_output(idx, node)
 
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)