You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/22 22:38:34 UTC

[kafka] branch trunk updated: MINOR: increase timeout for unstable KTableSourceTopicRestartIntegrationTest (#4445)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c08d7e  MINOR: increase timeout for unstable KTableSourceTopicRestartIntegrationTest (#4445)
2c08d7e is described below

commit 2c08d7e39e3c7e0a65b0255db3c86d3e259cedbf
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Jan 22 14:38:31 2018 -0800

    MINOR: increase timeout for unstable KTableSourceTopicRestartIntegrationTest (#4445)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../KTableSourceTopicRestartIntegrationTest.java   | 29 +++++++---------------
 1 file changed, 9 insertions(+), 20 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 4942b21..829d3a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -55,8 +55,6 @@ import java.util.concurrent.TimeUnit;
 
 @Category({IntegrationTest.class})
 public class KTableSourceTopicRestartIntegrationTest {
-
-
     private static final int NUM_BROKERS = 3;
     private static final String SOURCE_TOPIC = "source-topic";
 
@@ -72,10 +70,8 @@ public class KTableSourceTopicRestartIntegrationTest {
     private Map<String, String> expectedInitialResultsMap;
     private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
 
-
     @BeforeClass
     public static void setUpBeforeAllTests() throws Exception {
-
         CLUSTER.createTopic(SOURCE_TOPIC);
 
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
@@ -97,7 +93,6 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     @Before
     public void before() {
-
         final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
         kTable.toStream().foreach(new ForeachAction<String, String>() {
             @Override
@@ -115,10 +110,8 @@ public class KTableSourceTopicRestartIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
 
-
     @Test
     public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
-
         try {
             streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
             streamsOne.start();
@@ -136,7 +129,6 @@ public class KTableSourceTopicRestartIntegrationTest {
             produceKeyValues("f", "g", "h");
 
             assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
-
         } finally {
             streamsOne.close(5, TimeUnit.SECONDS);
         }
@@ -144,7 +136,6 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     @Test
     public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
-
         try {
             STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
             streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
@@ -163,7 +154,6 @@ public class KTableSourceTopicRestartIntegrationTest {
             produceKeyValues("f", "g", "h");
 
             assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
-
         } finally {
             streamsOne.close(5, TimeUnit.SECONDS);
         }
@@ -171,7 +161,6 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     @Test
     public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
-
         try {
             streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
             streamsOne.start();
@@ -189,7 +178,6 @@ public class KTableSourceTopicRestartIntegrationTest {
             final Map<String, String> expectedValues = createExpectedResultsMap("a", "b", "c", "f", "g", "h");
 
             assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart");
-
         } finally {
             streamsOne.close(5, TimeUnit.SECONDS);
         }
@@ -198,14 +186,15 @@ public class KTableSourceTopicRestartIntegrationTest {
     private void assertNumberValuesRead(final Map<String, String> valueMap,
                                         final Map<String, String> expectedMap,
                                         final String errorMessage) throws InterruptedException {
-
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return valueMap.equals(expectedMap);
-            }
-        }, errorMessage);
-
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return valueMap.equals(expectedMap);
+                }
+            },
+            30 * 1000L,
+            errorMessage);
     }
 
     private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.