You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/22 23:09:06 UTC
kafka git commit: KAFKA-4049: Fix transient failure in
RegexSourceIntegrationTest
Repository: kafka
Updated Branches:
refs/heads/trunk dedacd06e -> f90321553
KAFKA-4049: Fix transient failure in RegexSourceIntegrationTest
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <me...@ewencp.org>
Closes #1746 from guozhangwang/K4049-RegexSourceIntegrationTest-failure
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9032155
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9032155
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9032155
Branch: refs/heads/trunk
Commit: f903215536af06b7b79739882d9286abc2e50000
Parents: dedacd0
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Aug 22 16:09:02 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Aug 22 16:09:02 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/test/TestUtils.java | 1 -
.../integration/RegexSourceIntegrationTest.java | 62 +++++++-------------
2 files changed, 21 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f9032155/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 4baa63b..44026be 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -247,7 +247,6 @@ public class TestUtils {
public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException {
long startTime = System.currentTimeMillis();
-
while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
Thread.sleep(Math.min(maxWaitMs, 100L));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f9032155/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 0892893..51fa06a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -52,9 +52,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
@@ -81,9 +79,6 @@ public class RegexSourceIntegrationTest {
private static final String FA_TOPIC = "fa";
private static final String FOO_TOPIC = "foo";
- private static final int FIRST_UPDATE = 0;
- private static final int SECOND_UPDATE = 1;
-
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;
@@ -121,6 +116,8 @@ public class RegexSourceIntegrationTest {
public void testRegexMatchesTopicsAWhenCreated() throws Exception {
final Serde<String> stringSerde = Serdes.String();
+ final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
+ final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
@@ -146,41 +143,35 @@ public class RegexSourceIntegrationTest {
TestCondition oneTopicAdded = new TestCondition() {
@Override
public boolean conditionMet() {
- List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
- return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && !assignedTopics.contains("TEST-TOPIC-2");
+ return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
}
};
streamThreads[0] = testStreamThread;
streams.start();
- TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED);
CLUSTER.createTopic("TEST-TOPIC-2");
TestCondition secondTopicAdded = new TestCondition() {
@Override
public boolean conditionMet() {
- List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
- return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && assignedTopics.contains("TEST-TOPIC-2");
+ return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
}
};
- TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED);
streams.close();
-
- List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
- List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
-
- assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
- assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
}
@Test
public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
final Serde<String> stringSerde = Serdes.String();
+ final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
+ final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
@@ -209,34 +200,25 @@ public class RegexSourceIntegrationTest {
TestCondition bothTopicsAdded = new TestCondition() {
@Override
public boolean conditionMet() {
- List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
- return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B");
+ return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
}
};
streams.start();
- TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED);
CLUSTER.deleteTopic("TEST-TOPIC-A");
-
TestCondition oneTopicRemoved = new TestCondition() {
@Override
public boolean conditionMet() {
- List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
- return assignedTopics != null && !assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B");
+ return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
}
};
- TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED);
streams.close();
-
- List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
- List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
-
- assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
- assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
}
@@ -291,7 +273,7 @@ public class RegexSourceIntegrationTest {
assertThat(actualValues, equalTo(expectedReceivedValues));
}
- //TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
+ // TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
@Test(expected = AssertionError.class)
public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
@@ -304,8 +286,8 @@ public class RegexSourceIntegrationTest {
KStreamBuilder builder = new KStreamBuilder();
- // overlapping patterns here, no messages should be sent as TopologyBuilderException
- // will be thrown when the processor topology is built.
+ // overlapping patterns here, no messages should be sent as TopologyBuilderException
+ // will be thrown when the processor topology is built.
KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
@@ -334,9 +316,7 @@ public class RegexSourceIntegrationTest {
}
private class TestStreamThread extends StreamThread {
-
- public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
- private int index = 0;
+ public volatile List<String> assignedTopicPartitions = new ArrayList<>();
public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
@@ -344,15 +324,15 @@ public class RegexSourceIntegrationTest {
@Override
public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
- List<String> assignedTopics = new ArrayList<>();
+ List<String> topicPartitions = new ArrayList<>();
for (TopicPartition partition : partitions) {
- assignedTopics.add(partition.topic());
+ topicPartitions.add(partition.topic());
}
- Collections.sort(assignedTopics);
- assignedTopicPartitions.put(index++, assignedTopics);
+ Collections.sort(topicPartitions);
+
+ assignedTopicPartitions = topicPartitions;
return super.createStreamTask(id, partitions);
}
}
-
}