You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/24 15:10:21 UTC
[kafka] branch 2.3 updated: MINOR: fix Streams version-probing
system test (#6764)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 325816b MINOR: fix Streams version-probing system test (#6764)
325816b is described below
commit 325816b534b016b9f2262a8b6f2792836f0659b5
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri May 24 08:08:52 2019 -0700
MINOR: fix Streams version-probing system test (#6764)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Boyang Chen <bo...@confluent.io>
---
.../kafka/streams/tests/StreamsUpgradeTest.java | 20 ++++++++------------
.../kafkatest/tests/streams/streams_upgrade_test.py | 7 +++----
2 files changed, 11 insertions(+), 16 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 33e9b97..27bee81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -47,7 +47,6 @@ import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,16 +89,13 @@ public class StreamsUpgradeTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier);
streams.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- System.out.println("closing Kafka Streams instance");
- System.out.flush();
- streams.close();
- System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
- System.out.flush();
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }));
}
private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier {
@@ -168,7 +164,7 @@ public class StreamsUpgradeTest {
assignment.userData().putInt(0, AssignmentInfo.LATEST_SUPPORTED_VERSION));
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
- Collections.sort(partitions, PARTITION_COMPARATOR);
+ partitions.sort(PARTITION_COMPARATOR);
// version 1 field
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 8b62977..4cb8bb6 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -70,6 +70,7 @@ which are outlined here:
"""
+
class StreamsUpgradeTest(Test):
"""
Test upgrading Kafka Streams (all version combination)
@@ -348,7 +349,7 @@ class StreamsUpgradeTest(Test):
while retries > 0:
for p in self.processors:
found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
- if len(found) == self.leader_counter[p] + 1:
+ if len(found) >= self.leader_counter[p] + 1:
if self.leader is not None:
raise Exception("Could not uniquely identify leader")
self.leader = p
@@ -403,8 +404,7 @@ class StreamsUpgradeTest(Test):
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-
- # start third with <version>
+ # start third with <version>
self.prepare_for(self.processor3, version)
node3 = self.processor3.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
@@ -425,7 +425,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
-
@staticmethod
def prepare_for(processor, version):
processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)