You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/05 19:50:55 UTC

[kafka] branch trunk updated: MINOR: Tighten up metadata upgrade test (#6531)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4aa2cfe  MINOR: Tighten up metadata upgrade test (#6531)
4aa2cfe is described below

commit 4aa2cfe467b9ccae1b7d8a83777f900b6a526f59
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Apr 5 12:50:42 2019 -0700

    MINOR: Tighten up metadata upgrade test (#6531)
    
    Reviewers: Bill Bejeck <bb...@gmail.com>
---
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 .../tests/streams/streams_upgrade_test.py          | 59 ++++++++++++----------
 10 files changed, 40 insertions(+), 37 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index aa58d44..90e6ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -49,7 +49,7 @@ public class SmokeTestUtil {
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
-                        System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 32f96a0..c2d8c4d 100644
--- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -83,7 +83,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[0.10.0] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index dcb05ca..e525658 100644
--- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -86,7 +86,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[0.10.1] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index fb4a409..c113bd4 100644
--- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[0.10.2] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index b1aad5d..7df30a7 100644
--- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[0.11.0] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index dc72f2d..163b94b 100644
--- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[1.0] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index a8796cb..5f4c218 100644
--- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[1.1] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 7075be2..9fef2b1 100644
--- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -77,7 +77,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 3e719cf..2e108b2 100644
--- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -77,7 +77,7 @@ public class StreamsUpgradeTest {
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                     }
 
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 91f9052..8b62977 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -159,7 +159,7 @@ class StreamsUpgradeTest(Test):
                 processor.start()
                 monitor.wait_until(self.processed_msg,
                                    timeout_sec=60,
-                                   err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node))
+                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node))
 
             connected_message = "Discovered group coordinator"
             with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
@@ -382,9 +382,9 @@ class StreamsUpgradeTest(Test):
                 log_monitor.wait_until(kafka_version_str,
                                        timeout_sec=60,
                                        err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
-                monitor.wait_until("processed [0-9]* records from topic",
+                monitor.wait_until(self.processed_msg,
                                    timeout_sec=60,
-                                   err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
 
         # start second with <version>
         self.prepare_for(self.processor2, version)
@@ -395,15 +395,16 @@ class StreamsUpgradeTest(Test):
                     self.processor2.start()
                     log_monitor.wait_until(kafka_version_str,
                                            timeout_sec=60,
-                                           err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
-                    first_monitor.wait_until("processed [0-9]* records from topic",
+                                           err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
+                    first_monitor.wait_until(self.processed_msg,
                                              timeout_sec=60,
-                                             err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
-                    second_monitor.wait_until("processed [0-9]* records from topic",
+                                             err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
+                    second_monitor.wait_until(self.processed_msg,
                                               timeout_sec=60,
-                                              err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+                                              err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
+
 
-        # start third with <version>
+                    # start third with <version>
         self.prepare_for(self.processor3, version)
         node3 = self.processor3.node
         with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
@@ -413,16 +414,17 @@ class StreamsUpgradeTest(Test):
                         self.processor3.start()
                         log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
-                                               err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
-                        first_monitor.wait_until("processed [0-9]* records from topic",
+                                               err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
+                        first_monitor.wait_until(self.processed_msg,
                                                  timeout_sec=60,
-                                                 err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
-                        second_monitor.wait_until("processed [0-9]* records from topic",
-                                                  timeout_sec=60,
-                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
-                        third_monitor.wait_until("processed [0-9]* records from topic",
+                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
+                        second_monitor.wait_until(self.processed_msg,
                                                   timeout_sec=60,
-                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+                                                  err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
+                        third_monitor.wait_until(self.processed_msg,
+                                                 timeout_sec=60,
+                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+
 
     @staticmethod
     def prepare_for(processor, version):
@@ -452,12 +454,12 @@ class StreamsUpgradeTest(Test):
         with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
             with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
                 processor.stop()
-                first_other_monitor.wait_until("processed 100 records from topic",
+                first_other_monitor.wait_until(self.processed_msg,
                                                timeout_sec=60,
-                                               err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
-                second_other_monitor.wait_until("processed 100 records from topic",
+                                               err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
+                second_other_monitor.wait_until(self.processed_msg,
                                                 timeout_sec=60,
-                                                err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+                                                err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
         node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
         if upgrade_from is None:  # upgrade disabled -- second round of rolling bounces
@@ -484,24 +486,25 @@ class StreamsUpgradeTest(Test):
 
                         log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
-                                               err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
-                        first_other_monitor.wait_until("processed 100 records from topic",
+                                               err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
+                        first_other_monitor.wait_until(self.processed_msg,
                                                        timeout_sec=60,
-                                                       err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                                                       err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
                         found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
                         if len(found) > 0:
                             raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
 
-                        second_other_monitor.wait_until("processed 100 records from topic",
+                        second_other_monitor.wait_until(self.processed_msg,
                                                         timeout_sec=60,
-                                                        err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+                                                        err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
                         found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
                         if len(found) > 0:
                             raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
 
-                        monitor.wait_until("processed 100 records from topic",
+                        monitor.wait_until(self.processed_msg,
                                            timeout_sec=60,
-                                           err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
+                                           err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
+
 
     def do_rolling_bounce(self, processor, counter, current_generation):
         first_other_processor = None