You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/20 02:44:58 UTC
[kafka] branch trunk updated: MINOR: fix failing Streams system
tests (#5928)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 190cbd9 MINOR: fix failing Streams system tests (#5928)
190cbd9 is described below
commit 190cbd9fe59e3baa2d6002d60ed241120f346228
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Nov 19 18:44:45 2018 -0800
MINOR: fix failing Streams system tests (#5928)
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../kafkatest/tests/streams/streams_upgrade_test.py | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 4314a35..baf507b 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -310,14 +310,23 @@ class StreamsUpgradeTest(Test):
if self.leader is None:
raise Exception("Could not identify leader")
+ def get_version_string(self, version):
+ if version.startswith("0") or version.startswith("1") \
+ or version.startswith("2.0") or version.startswith("2.1"):
+ return "Kafka version : " + version
+ else:
+ return "Kafka version: " + version
+
def start_all_nodes_with(self, version):
+ kafka_version_str = self.get_version_string(version)
+
# start first with <version>
self.prepare_for(self.processor1, version)
node1 = self.processor1.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
self.processor1.start()
- log_monitor.wait_until("Kafka version : " + version,
+ log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until("processed 100 records from topic",
@@ -331,7 +340,7 @@ class StreamsUpgradeTest(Test):
with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
self.processor2.start()
- log_monitor.wait_until("Kafka version : " + version,
+ log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
first_monitor.wait_until("processed 100 records from topic",
@@ -349,7 +358,7 @@ class StreamsUpgradeTest(Test):
with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
self.processor3.start()
- log_monitor.wait_until("Kafka version : " + version,
+ log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
first_monitor.wait_until("processed 100 records from topic",
@@ -371,6 +380,8 @@ class StreamsUpgradeTest(Test):
processor.set_version(version)
def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter):
+ kafka_version_str = self.get_version_string(new_version)
+
first_other_processor = None
second_other_processor = None
for p in self.processors:
@@ -418,7 +429,7 @@ class StreamsUpgradeTest(Test):
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.start()
- log_monitor.wait_until("Kafka version : " + new_version,
+ log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
first_other_monitor.wait_until("processed 100 records from topic",
@@ -470,7 +481,7 @@ class StreamsUpgradeTest(Test):
self.old_processors.remove(processor)
self.upgraded_processors.append(processor)
- log_monitor.wait_until("Kafka version : " + str(DEV_VERSION),
+ log_monitor.wait_until("Kafka version: " + str(DEV_VERSION),
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account))
log_monitor.offset = 5