You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/20 02:44:58 UTC

[kafka] branch trunk updated: MINOR: fix failing Streams system tests (#5928)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 190cbd9  MINOR: fix failing Streams system tests (#5928)
190cbd9 is described below

commit 190cbd9fe59e3baa2d6002d60ed241120f346228
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Nov 19 18:44:45 2018 -0800

    MINOR: fix failing Streams system tests (#5928)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../kafkatest/tests/streams/streams_upgrade_test.py | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 4314a35..baf507b 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -310,14 +310,23 @@ class StreamsUpgradeTest(Test):
         if self.leader is None:
             raise Exception("Could not identify leader")
 
+    def get_version_string(self, version):
+        if version.startswith("0") or version.startswith("1") \
+          or version.startswith("2.0") or version.startswith("2.1"):
+            return "Kafka version : " + version
+        else:
+            return "Kafka version: " + version
+
     def start_all_nodes_with(self, version):
+        kafka_version_str = self.get_version_string(version)
+
         # start first with <version>
         self.prepare_for(self.processor1, version)
         node1 = self.processor1.node
         with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
             with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
                 self.processor1.start()
-                log_monitor.wait_until("Kafka version : " + version,
+                log_monitor.wait_until(kafka_version_str,
                                        timeout_sec=60,
                                        err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
                 monitor.wait_until("processed 100 records from topic",
@@ -331,7 +340,7 @@ class StreamsUpgradeTest(Test):
             with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
                 with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
                     self.processor2.start()
-                    log_monitor.wait_until("Kafka version : " + version,
+                    log_monitor.wait_until(kafka_version_str,
                                            timeout_sec=60,
                                            err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
                     first_monitor.wait_until("processed 100 records from topic",
@@ -349,7 +358,7 @@ class StreamsUpgradeTest(Test):
                 with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
                     with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
                         self.processor3.start()
-                        log_monitor.wait_until("Kafka version : " + version,
+                        log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
                                                err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
                         first_monitor.wait_until("processed 100 records from topic",
@@ -371,6 +380,8 @@ class StreamsUpgradeTest(Test):
             processor.set_version(version)
 
     def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter):
+        kafka_version_str = self.get_version_string(new_version)
+
         first_other_processor = None
         second_other_processor = None
         for p in self.processors:
@@ -418,7 +429,7 @@ class StreamsUpgradeTest(Test):
                     with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
                         processor.start()
 
-                        log_monitor.wait_until("Kafka version : " + new_version,
+                        log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
                                                err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
                         first_other_monitor.wait_until("processed 100 records from topic",
@@ -470,7 +481,7 @@ class StreamsUpgradeTest(Test):
                     self.old_processors.remove(processor)
                     self.upgraded_processors.append(processor)
 
-                    log_monitor.wait_until("Kafka version : " + str(DEV_VERSION),
+                    log_monitor.wait_until("Kafka version: " + str(DEV_VERSION),
                                            timeout_sec=60,
                                            err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account))
                     log_monitor.offset = 5