You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/22 22:38:34 UTC
[kafka] branch trunk updated: MINOR: increase timeout for unstable
KTableSourceTopicRestartIntegrationTest (#4445)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c08d7e MINOR: increase timeout for unstable KTableSourceTopicRestartIntegrationTest (#4445)
2c08d7e is described below
commit 2c08d7e39e3c7e0a65b0255db3c86d3e259cedbf
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Jan 22 14:38:31 2018 -0800
MINOR: increase timeout for unstable KTableSourceTopicRestartIntegrationTest (#4445)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../KTableSourceTopicRestartIntegrationTest.java | 29 +++++++---------------
1 file changed, 9 insertions(+), 20 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 4942b21..829d3a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -55,8 +55,6 @@ import java.util.concurrent.TimeUnit;
@Category({IntegrationTest.class})
public class KTableSourceTopicRestartIntegrationTest {
-
-
private static final int NUM_BROKERS = 3;
private static final String SOURCE_TOPIC = "source-topic";
@@ -72,10 +70,8 @@ public class KTableSourceTopicRestartIntegrationTest {
private Map<String, String> expectedInitialResultsMap;
private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
-
@BeforeClass
public static void setUpBeforeAllTests() throws Exception {
-
CLUSTER.createTopic(SOURCE_TOPIC);
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
@@ -97,7 +93,6 @@ public class KTableSourceTopicRestartIntegrationTest {
@Before
public void before() {
-
final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
kTable.toStream().foreach(new ForeachAction<String, String>() {
@Override
@@ -115,10 +110,8 @@ public class KTableSourceTopicRestartIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
-
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
-
try {
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streamsOne.start();
@@ -136,7 +129,6 @@ public class KTableSourceTopicRestartIntegrationTest {
produceKeyValues("f", "g", "h");
assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
-
} finally {
streamsOne.close(5, TimeUnit.SECONDS);
}
@@ -144,7 +136,6 @@ public class KTableSourceTopicRestartIntegrationTest {
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
-
try {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
@@ -163,7 +154,6 @@ public class KTableSourceTopicRestartIntegrationTest {
produceKeyValues("f", "g", "h");
assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
-
} finally {
streamsOne.close(5, TimeUnit.SECONDS);
}
@@ -171,7 +161,6 @@ public class KTableSourceTopicRestartIntegrationTest {
@Test
public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
-
try {
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streamsOne.start();
@@ -189,7 +178,6 @@ public class KTableSourceTopicRestartIntegrationTest {
final Map<String, String> expectedValues = createExpectedResultsMap("a", "b", "c", "f", "g", "h");
assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart");
-
} finally {
streamsOne.close(5, TimeUnit.SECONDS);
}
@@ -198,14 +186,15 @@ public class KTableSourceTopicRestartIntegrationTest {
private void assertNumberValuesRead(final Map<String, String> valueMap,
final Map<String, String> expectedMap,
final String errorMessage) throws InterruptedException {
-
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return valueMap.equals(expectedMap);
- }
- }, errorMessage);
-
+ TestUtils.waitForCondition(
+ new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return valueMap.equals(expectedMap);
+ }
+ },
+ 30 * 1000L,
+ errorMessage);
}
private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException {
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.