You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/05 19:50:55 UTC
[kafka] branch trunk updated: MINOR: Tighten up metadata upgrade
test (#6531)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4aa2cfe MINOR: Tighten up metadata upgrade test (#6531)
4aa2cfe is described below
commit 4aa2cfe467b9ccae1b7d8a83777f900b6a526f59
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Apr 5 12:50:42 2019 -0700
MINOR: Tighten up metadata upgrade test (#6531)
Reviewers: Bill Bejeck <bb...@gmail.com>
---
.../apache/kafka/streams/tests/SmokeTestUtil.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 2 +-
.../tests/streams/streams_upgrade_test.py | 59 ++++++++++++----------
10 files changed, 40 insertions(+), 37 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index aa58d44..90e6ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -49,7 +49,7 @@ public class SmokeTestUtil {
@Override
public void init(final ProcessorContext context) {
super.init(context);
- System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
+ System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 32f96a0..c2d8c4d 100644
--- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -83,7 +83,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[0.10.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index dcb05ca..e525658 100644
--- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -86,7 +86,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[0.10.1] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index fb4a409..c113bd4 100644
--- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[0.10.2] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index b1aad5d..7df30a7 100644
--- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[0.11.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index dc72f2d..163b94b 100644
--- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[1.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index a8796cb..5f4c218 100644
--- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[1.1] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 7075be2..9fef2b1 100644
--- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -77,7 +77,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 3e719cf..2e108b2 100644
--- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -77,7 +77,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
- System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 91f9052..8b62977 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -159,7 +159,7 @@ class StreamsUpgradeTest(Test):
processor.start()
monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node))
connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
@@ -382,9 +382,9 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
- monitor.wait_until("processed [0-9]* records from topic",
+ monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
# start second with <version>
self.prepare_for(self.processor2, version)
@@ -395,15 +395,16 @@ class StreamsUpgradeTest(Test):
self.processor2.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
- err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
- first_monitor.wait_until("processed [0-9]* records from topic",
+ err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
+ first_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
- second_monitor.wait_until("processed [0-9]* records from topic",
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
+ second_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
+
- # start third with <version>
+ # start third with <version>
self.prepare_for(self.processor3, version)
node3 = self.processor3.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
@@ -413,16 +414,17 @@ class StreamsUpgradeTest(Test):
self.processor3.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
- err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
- first_monitor.wait_until("processed [0-9]* records from topic",
+ err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
+ first_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
- second_monitor.wait_until("processed [0-9]* records from topic",
- timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
- third_monitor.wait_until("processed [0-9]* records from topic",
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
+ second_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
+ third_monitor.wait_until(self.processed_msg,
+ timeout_sec=60,
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+
@staticmethod
def prepare_for(processor, version):
@@ -452,12 +454,12 @@ class StreamsUpgradeTest(Test):
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.stop()
- first_other_monitor.wait_until("processed 100 records from topic",
+ first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
- second_other_monitor.wait_until("processed 100 records from topic",
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
+ second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of rolling bounces
@@ -484,24 +486,25 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
- err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
- first_other_monitor.wait_until("processed 100 records from topic",
+ err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
+ first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
- second_other_monitor.wait_until("processed 100 records from topic",
+ second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
- monitor.wait_until("processed 100 records from topic",
+ monitor.wait_until(self.processed_msg,
timeout_sec=60,
- err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
+ err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
+
def do_rolling_bounce(self, processor, counter, current_generation):
first_other_processor = None