You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/04 07:20:19 UTC

kafka git commit: HOTFIX: fix potentially hanging test shouldAddStateStoreToRegexDefinedSource

Repository: kafka
Updated Branches:
  refs/heads/trunk ec9e4eafa -> 24e642342


HOTFIX: fix potentially hanging test shouldAddStateStoreToRegexDefinedSource

…dSource

Author: bbejeck <bb...@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2783 from bbejeck/HOTFIX_potentially_hanging_test_in_RegexSourceIntegrationTest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24e64234
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24e64234
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24e64234

Branch: refs/heads/trunk
Commit: 24e6423424bf0e41dc05c4a67226e92011e34968
Parents: ec9e4ea
Author: Bill Bejeck <bb...@gmail.com>
Authored: Thu May 4 00:20:16 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 4 00:20:16 2017 -0700

----------------------------------------------------------------------
 .../integration/RegexSourceIntegrationTest.java | 29 ++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/24e64234/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5647b1e..011bca6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -64,7 +64,6 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -241,26 +240,34 @@ public class RegexSourceIntegrationTest {
     @Test
     public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
 
-        ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-        MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
+        final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
+        final long thirtySecondTimeout = 30 * 1000;
 
-        TopologyBuilder builder = new TopologyBuilder()
+        final TopologyBuilder builder = new TopologyBuilder()
                 .addSource("ingest", Pattern.compile("topic-\\d+"))
                 .addProcessor("my-processor", processorSupplier, "ingest")
                 .addStateStore(stateStoreSupplier, "my-processor");
 
 
         final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
-        streams.start();
-
-        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+        try {
+            streams.start();
 
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for test"), producerConfig, mockTime);
-        streams.close();
+            final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+                    final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
+                    return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1");
+                }
+            };
 
-        Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+            TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
 
-        assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1"));
+        } finally {
+            streams.close();
+        }
     }