You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/04 07:20:19 UTC
kafka git commit: HOTFIX: fix potentially hanging test
shouldAddStateStoreToRegexDefinedSource
Repository: kafka
Updated Branches:
refs/heads/trunk ec9e4eafa -> 24e642342
HOTFIX: fix potentially hanging test shouldAddStateStoreToRegexDefinedSource
…dSource
Author: bbejeck <bb...@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #2783 from bbejeck/HOTFIX_potentially_hanging_test_in_RegexSourceIntegrationTest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24e64234
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24e64234
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24e64234
Branch: refs/heads/trunk
Commit: 24e6423424bf0e41dc05c4a67226e92011e34968
Parents: ec9e4ea
Author: Bill Bejeck <bb...@gmail.com>
Authored: Thu May 4 00:20:16 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 4 00:20:16 2017 -0700
----------------------------------------------------------------------
.../integration/RegexSourceIntegrationTest.java | 29 ++++++++++++--------
1 file changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/24e64234/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5647b1e..011bca6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -64,7 +64,6 @@ import java.util.UUID;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -241,26 +240,34 @@ public class RegexSourceIntegrationTest {
@Test
public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
- ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
- MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
+ final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
+ final long thirtySecondTimeout = 30 * 1000;
- TopologyBuilder builder = new TopologyBuilder()
+ final TopologyBuilder builder = new TopologyBuilder()
.addSource("ingest", Pattern.compile("topic-\\d+"))
.addProcessor("my-processor", processorSupplier, "ingest")
.addStateStore(stateStoreSupplier, "my-processor");
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
- streams.start();
-
- final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+ try {
+ streams.start();
- IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for test"), producerConfig, mockTime);
- streams.close();
+ final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ final Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+ final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
+ return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
+ }
+ };
- Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+ TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
- assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1"));
+ } finally {
+ streams.close();
+ }
}