You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/30 20:30:13 UTC
kafka git commit: KAFKA-4980: testReprocessingFromScratch unit test
failure
Repository: kafka
Updated Branches:
refs/heads/trunk 2e075fe6a -> 92b7d7570
KAFKA-4980: testReprocessingFromScratch unit test failure
We got test error `org.apache.kafka.common.errors.TopicExistsException: Topic 'inputTopic' already exists.` in some builds. Can reproduce reliably at local machine. Root cause it async "topic delete" that might not be finished before topic gets re-created.
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Ismael Juma, Damian Guy, Guozhang Wang
Closes #2757 from mjsax/minor-fix-resetintegrationtest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92b7d757
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92b7d757
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92b7d757
Branch: refs/heads/trunk
Commit: 92b7d7570027cdc4e069862472ce6ec6b3dad04a
Parents: 2e075fe
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Mar 30 13:30:10 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 30 13:30:10 2017 -0700
----------------------------------------------------------------------
.../integration/ResetIntegrationTest.java | 33 +++++++++++++++++++-
1 file changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92b7d757/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 695e900..cf35902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -117,7 +117,7 @@ public class ResetIntegrationTest {
try {
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- } catch (TimeoutException e) {
+ } catch (final TimeoutException e) {
continue;
}
break;
@@ -300,6 +300,9 @@ public class ResetIntegrationTest {
} catch (final UnknownTopicOrPartitionException e) {
// ignore
}
+
+ waitUntilUserTopicsAreDeleted();
+
CLUSTER.createTopic(INPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC_2);
@@ -402,6 +405,34 @@ public class ResetIntegrationTest {
Assert.assertEquals(0, exitCode);
}
+ private void waitUntilUserTopicsAreDeleted() {
+ ZkUtils zkUtils = null;
+ try {
+ zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled());
+
+ while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())))) {
+ Utils.sleep(100);
+ }
+ } finally {
+ if (zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+ }
+
+ private boolean userTopicExists(final Set<String> allTopics) {
+ final Set<String> expectedMissingTopics = new HashSet<>();
+ expectedMissingTopics.add(INPUT_TOPIC);
+ expectedMissingTopics.add(OUTPUT_TOPIC);
+ expectedMissingTopics.add(OUTPUT_TOPIC_2);
+ expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN);
+
+ return expectedMissingTopics.removeAll(allTopics);
+ }
+
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);