You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/07/20 04:41:36 UTC
kafka git commit: Revert "MINOR: Do not wait for first line of
console consumer output since we now have a more reliable test using JMX"
Repository: kafka
Updated Branches:
refs/heads/0.10.2 40e36fccb -> 021960be8
Revert "MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX"
This reverts commit 40e36fccb6d6075e206347489391e46d505d6004.
See https://issues.apache.org/jira/browse/KAFKA-5608 and https://github.com/apache/kafka/pull/3547 for more details.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/021960be
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/021960be
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/021960be
Branch: refs/heads/0.10.2
Commit: 021960be87c3b69bf867e153d58cc7d0e3e989b2
Parents: 40e36fc
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Jul 19 21:40:39 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Jul 19 21:40:39 2017 -0700
----------------------------------------------------------------------
tests/kafkatest/services/console_consumer.py | 40 ++++++++++++-----------
1 file changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/021960be/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 53ed357..d55d012 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -250,25 +250,27 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
self.logger.debug("Console consumer %d command: %s", idx, cmd)
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-
- self.init_jmx_attributes()
- self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
- self.start_jmx_tool(idx, node)
-
- for line in consumer_output:
- msg = line.strip()
- if msg == "shutdown_complete":
- # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
- if node in self.clean_shutdown_nodes:
- raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
- self.clean_shutdown_nodes.add(node)
- else:
- if self.message_validator is not None:
- msg = self.message_validator(msg)
- if msg is not None:
- self.messages_consumed[idx].append(msg)
-
- self.read_jmx_output(idx, node)
+ first_line = next(consumer_output, None)
+
+ if first_line is not None:
+ self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+ self.init_jmx_attributes()
+ self.start_jmx_tool(idx, node)
+
+ for line in itertools.chain([first_line], consumer_output):
+ msg = line.strip()
+ if msg == "shutdown_complete":
+ # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater
+ if node in self.clean_shutdown_nodes:
+ raise Exception("Unexpected shutdown event from consumer, already shutdown. Consumer index: %d" % idx)
+ self.clean_shutdown_nodes.add(node)
+ else:
+ if self.message_validator is not None:
+ msg = self.message_validator(msg)
+ if msg is not None:
+ self.messages_consumed[idx].append(msg)
+
+ self.read_jmx_output(idx, node)
def start_node(self, node):
BackgroundThreadService.start_node(self, node)