You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/24 15:10:21 UTC

[kafka] branch 2.3 updated: MINOR: fix Streams version-probing system test (#6764)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 325816b  MINOR: fix Streams version-probing system test (#6764)
325816b is described below

commit 325816b534b016b9f2262a8b6f2792836f0659b5
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri May 24 08:08:52 2019 -0700

    MINOR: fix Streams version-probing system test (#6764)
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Boyang Chen <bo...@confluent.io>
---
 .../kafka/streams/tests/StreamsUpgradeTest.java      | 20 ++++++++------------
 .../kafkatest/tests/streams/streams_upgrade_test.py  |  7 +++----
 2 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 33e9b97..27bee81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -47,7 +47,6 @@ import java.io.IOException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,16 +89,13 @@ public class StreamsUpgradeTest {
         final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier);
         streams.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                System.out.println("closing Kafka Streams instance");
-                System.out.flush();
-                streams.close();
-                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            System.out.println("closing Kafka Streams instance");
+            System.out.flush();
+            streams.close();
+            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+            System.out.flush();
+        }));
     }
 
     private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier {
@@ -168,7 +164,7 @@ public class StreamsUpgradeTest {
                 assignment.userData().putInt(0, AssignmentInfo.LATEST_SUPPORTED_VERSION));
 
             final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
-            Collections.sort(partitions, PARTITION_COMPARATOR);
+            partitions.sort(PARTITION_COMPARATOR);
 
             // version 1 field
             final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 8b62977..4cb8bb6 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -70,6 +70,7 @@ which are outlined here:
 
 """
 
+
 class StreamsUpgradeTest(Test):
     """
     Test upgrading Kafka Streams (all version combination)
@@ -348,7 +349,7 @@ class StreamsUpgradeTest(Test):
         while retries > 0:
             for p in self.processors:
                 found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
-                if len(found) == self.leader_counter[p] + 1:
+                if len(found) >= self.leader_counter[p] + 1:
                     if self.leader is not None:
                         raise Exception("Could not uniquely identify leader")
                     self.leader = p
@@ -403,8 +404,7 @@ class StreamsUpgradeTest(Test):
                                               timeout_sec=60,
                                               err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
 
-
-                    # start third with <version>
+        # start third with <version>
         self.prepare_for(self.processor3, version)
         node3 = self.processor3.node
         with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
@@ -425,7 +425,6 @@ class StreamsUpgradeTest(Test):
                                                  timeout_sec=60,
                                                  err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
 
-
     @staticmethod
     def prepare_for(processor, version):
         processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)