You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/20 23:05:51 UTC
kafka git commit: KAFKA-4980: testReprocessingFromScratch unit test
failure
Repository: kafka
Updated Branches:
refs/heads/0.10.2 0731d35e5 -> 495cdbde9
KAFKA-4980: testReprocessingFromScratch unit test failure
We got test error `org.apache.kafka.common.errors.TopicExistsException: Topic 'inputTopic' already exists.` in some builds. Can reproduce reliably at local machine. Root cause it async "topic delete" that might not be finished before topic gets re-created.
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Ismael Juma, Damian Guy, Guozhang Wang
Closes #2757 from mjsax/minor-fix-resetintegrationtest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/495cdbde
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/495cdbde
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/495cdbde
Branch: refs/heads/0.10.2
Commit: 495cdbde9e164513c8f982df80c0f29a871b62f2
Parents: 0731d35
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Mar 30 13:30:10 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 20 16:05:31 2017 -0700
----------------------------------------------------------------------
.../integration/ResetIntegrationTest.java | 38 ++++++++++++++++++--
1 file changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/495cdbde/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 27b9ea6..d653d00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,6 +23,7 @@ import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
@@ -115,9 +116,11 @@ public class ResetIntegrationTest {
try {
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
- } catch (GroupCoordinatorNotAvailableException e) {
+ } catch (final GroupCoordinatorNotAvailableException e) {
continue;
- } catch (IllegalArgumentException e) {
+ } catch (final IllegalArgumentException e) {
+ continue;
+ } catch (final TimeoutException e) {
continue;
}
break;
@@ -301,6 +304,9 @@ public class ResetIntegrationTest {
} catch (final UnknownTopicOrPartitionException e) {
// ignore
}
+
+ waitUntilUserTopicsAreDeleted();
+
CLUSTER.createTopic(INPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC);
CLUSTER.createTopic(OUTPUT_TOPIC_2);
@@ -403,6 +409,34 @@ public class ResetIntegrationTest {
Assert.assertEquals(0, exitCode);
}
+ private void waitUntilUserTopicsAreDeleted() {
+ ZkUtils zkUtils = null;
+ try {
+ zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled());
+
+ while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())))) {
+ Utils.sleep(100);
+ }
+ } finally {
+ if (zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+ }
+
+ private boolean userTopicExists(final Set<String> allTopics) {
+ final Set<String> expectedMissingTopics = new HashSet<>();
+ expectedMissingTopics.add(INPUT_TOPIC);
+ expectedMissingTopics.add(OUTPUT_TOPIC);
+ expectedMissingTopics.add(OUTPUT_TOPIC_2);
+ expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN);
+
+ return expectedMissingTopics.removeAll(allTopics);
+ }
+
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);