You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/22 23:09:06 UTC

kafka git commit: KAFKA-4049: Fix transient failure in RegexSourceIntegrationTest

Repository: kafka
Updated Branches:
  refs/heads/trunk dedacd06e -> f90321553


KAFKA-4049: Fix transient failure in RegexSourceIntegrationTest

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <me...@ewencp.org>

Closes #1746 from guozhangwang/K4049-RegexSourceIntegrationTest-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9032155
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9032155
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9032155

Branch: refs/heads/trunk
Commit: f903215536af06b7b79739882d9286abc2e50000
Parents: dedacd0
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Aug 22 16:09:02 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Aug 22 16:09:02 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   |  1 -
 .../integration/RegexSourceIntegrationTest.java | 62 +++++++-------------
 2 files changed, 21 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9032155/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 4baa63b..44026be 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -247,7 +247,6 @@ public class TestUtils {
     public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException {
         long startTime = System.currentTimeMillis();
 
-
         while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
             Thread.sleep(Math.min(maxWaitMs, 100L));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9032155/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 0892893..51fa06a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -52,9 +52,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.regex.Pattern;
@@ -81,9 +79,6 @@ public class RegexSourceIntegrationTest {
     private static final String FA_TOPIC = "fa";
     private static final String FOO_TOPIC = "foo";
 
-    private static final int FIRST_UPDATE = 0;
-    private static final int SECOND_UPDATE = 1;
-
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
@@ -121,6 +116,8 @@ public class RegexSourceIntegrationTest {
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
 
         final Serde<String> stringSerde = Serdes.String();
+        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
+        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
         StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
@@ -146,41 +143,35 @@ public class RegexSourceIntegrationTest {
         TestCondition oneTopicAdded  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
-                return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && !assignedTopics.contains("TEST-TOPIC-2");
+                return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
             }
         };
 
         streamThreads[0] = testStreamThread;
         streams.start();
 
-        TestUtils.waitForCondition(oneTopicAdded,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
         TestCondition secondTopicAdded  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
-                return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && assignedTopics.contains("TEST-TOPIC-2");
+                return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
             }
         };
 
-        TestUtils.waitForCondition(secondTopicAdded,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
-
-        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
-        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
-
-        assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
-        assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
     }
 
     @Test
     public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
 
         final Serde<String> stringSerde = Serdes.String();
+        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
+        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
 
         StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
@@ -209,34 +200,25 @@ public class RegexSourceIntegrationTest {
         TestCondition bothTopicsAdded  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
-                return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B");
+                return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
             }
         };
         streams.start();
 
-        TestUtils.waitForCondition(bothTopicsAdded,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-
         TestCondition oneTopicRemoved  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
-                return assignedTopics != null && !assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B");
+                return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
             }
         };
 
-        TestUtils.waitForCondition(oneTopicRemoved,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
-
-        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
-        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
-
-        assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
-        assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
     }
 
 
@@ -291,7 +273,7 @@ public class RegexSourceIntegrationTest {
         assertThat(actualValues, equalTo(expectedReceivedValues));
     }
 
-    //TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
+    // TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
     @Test(expected = AssertionError.class)
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
 
@@ -304,8 +286,8 @@ public class RegexSourceIntegrationTest {
         KStreamBuilder builder = new KStreamBuilder();
 
 
-        //  overlapping patterns here, no messages should be sent as TopologyBuilderException
-        //  will be thrown when the processor topology is built.
+        // overlapping patterns here, no messages should be sent as TopologyBuilderException
+        // will be thrown when the processor topology is built.
 
         KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
         KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
@@ -334,9 +316,7 @@ public class RegexSourceIntegrationTest {
     }
 
     private class TestStreamThread extends StreamThread {
-
-        public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
-        private int index =  0;
+        public volatile List<String> assignedTopicPartitions = new ArrayList<>();
 
         public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
             super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
@@ -344,15 +324,15 @@ public class RegexSourceIntegrationTest {
 
         @Override
         public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
-            List<String> assignedTopics = new ArrayList<>();
+            List<String> topicPartitions = new ArrayList<>();
             for (TopicPartition partition : partitions) {
-                assignedTopics.add(partition.topic());
+                topicPartitions.add(partition.topic());
             }
-            Collections.sort(assignedTopics);
-            assignedTopicPartitions.put(index++, assignedTopics);
+            Collections.sort(topicPartitions);
+
+            assignedTopicPartitions = topicPartitions;
             return super.createStreamTask(id, partitions);
         }
 
     }
-
 }