You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/23 18:53:16 UTC

kafka git commit: MINOR: improve EmbeddedKafkaCluster test utility for deleting topics

Repository: kafka
Updated Branches:
  refs/heads/trunk 70ec4b1d9 -> 495877505


MINOR: improve EmbeddedKafkaCluster test utility for deleting topics

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck, Damian Guy, Eno Thereska, Guozhang Wang

Closes #3104 from mjsax/minor-improve-embedded-kafka-cluster


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49587750
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49587750
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49587750

Branch: refs/heads/trunk
Commit: 49587750570a2aaf332eb73f5aad373196f984ce
Parents: 70ec4b1
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue May 23 11:53:13 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 23 11:53:13 2017 -0700

----------------------------------------------------------------------
 .../integration/FanoutIntegrationTest.java      |   4 +-
 .../GlobalKTableIntegrationTest.java            |   3 +-
 .../InternalTopicIntegrationTest.java           |   3 +-
 .../integration/JoinIntegrationTest.java        |  41 +------
 .../KStreamAggregationIntegrationTest.java      |   3 +-
 .../KStreamKTableJoinIntegrationTest.java       |   4 +-
 ...eamsFineGrainedAutoResetIntegrationTest.java |  47 ++++----
 .../KTableKTableJoinIntegrationTest.java        |   5 +-
 .../QueryableStateIntegrationTest.java          |   7 +-
 .../integration/RegexSourceIntegrationTest.java |  23 ++--
 .../integration/ResetIntegrationTest.java       |  77 +-----------
 .../integration/utils/EmbeddedKafkaCluster.java | 118 ++++++++++++++++++-
 12 files changed, 163 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 8593317..f1f09a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -79,9 +79,7 @@ public class FanoutIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(INPUT_TOPIC_A);
-        CLUSTER.createTopic(OUTPUT_TOPIC_B);
-        CLUSTER.createTopic(OUTPUT_TOPIC_C);
+        CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index e5ed3d8..869c255 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -222,8 +222,7 @@ public class GlobalKTableIntegrationTest {
         inputStream = "input-stream-" + testNo;
         inputTable = "input-table-" + testNo;
         globalOne = "globalOne-" + testNo;
-        CLUSTER.createTopic(inputStream);
-        CLUSTER.createTopic(inputTable);
+        CLUSTER.createTopics(inputStream, inputTable);
         CLUSTER.createTopic(globalOne, 2, 1);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 29cdc1b..3658d10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -77,8 +77,7 @@ public class InternalTopicIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
-        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
+        CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC);
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 040c784..b69e58b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -36,10 +34,8 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -48,11 +44,9 @@ import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -65,8 +59,6 @@ public class JoinIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
-    private static ZkUtils zkUtils = null;
-
     private static final String APP_ID = "join-integration-test";
     private static final String INPUT_TOPIC_1 = "inputTopicLeft";
     private static final String INPUT_TOPIC_2 = "inputTopicRight";
@@ -107,8 +99,6 @@ public class JoinIntegrationTest {
         }
     };
 
-    private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();
-
     @BeforeClass
     public static void setupConfigsAndUtils() throws Exception {
         PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -131,25 +121,11 @@ public class JoinIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-
-        zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
-            30000,
-            30000,
-            JaasUtils.isZkSecurityEnabled());
-    }
-
-    @AfterClass
-    public static void release() {
-        if (zkUtils != null) {
-            zkUtils.close();
-        }
     }
 
     @Before
     public void prepareTopology() throws Exception {
-        CLUSTER.createTopic(INPUT_TOPIC_1);
-        CLUSTER.createTopic(INPUT_TOPIC_2);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
+        CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
 
         builder = new KStreamBuilder();
         leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
@@ -160,11 +136,7 @@ public class JoinIntegrationTest {
 
     @After
     public void cleanup() throws Exception {
-        CLUSTER.deleteTopic(INPUT_TOPIC_1);
-        CLUSTER.deleteTopic(INPUT_TOPIC_2);
-        CLUSTER.deleteTopic(OUTPUT_TOPIC);
-
-        TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds.");
+        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
     }
 
     private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
@@ -414,15 +386,6 @@ public class JoinIntegrationTest {
         runTest(expectedResult);
     }
 
-    private final class TopicsGotDeletedCondition implements TestCondition {
-        @Override
-        public boolean conditionMet() {
-            final Set<String> allTopics = new HashSet<>();
-            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC);
-        }
-    }
-
     private final class Input<V> {
         String topic;
         KeyValue<Long, V> record;

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 0722210..0d5472c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -662,8 +662,7 @@ public class KStreamAggregationIntegrationTest {
         outputTopic = "output-" + testNo;
         userSessionsStream = userSessionsStream + "-" + testNo;
         CLUSTER.createTopic(streamOneInput, 3, 1);
-        CLUSTER.createTopic(userSessionsStream);
-        CLUSTER.createTopic(outputTopic);
+        CLUSTER.createTopics(userSessionsStream, outputTopic);
     }
 
     private void startStreams() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 9b5b428..5a17566 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -80,9 +80,7 @@ public class KStreamKTableJoinIntegrationTest {
         userRegionsTopic = "user-regions-" + testNo;
         userRegionsStoreName = "user-regions-store-name-" + testNo;
         outputTopic = "output-topic-" + testNo;
-        CLUSTER.createTopic(userClicksTopic);
-        CLUSTER.createTopic(userRegionsTopic);
-        CLUSTER.createTopic(outputTopic);
+        CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic);
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index cff5f43..a868839 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -104,29 +104,30 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(TOPIC_1_0);
-        CLUSTER.createTopic(TOPIC_2_0);
-        CLUSTER.createTopic(TOPIC_A_0);
-        CLUSTER.createTopic(TOPIC_C_0);
-        CLUSTER.createTopic(TOPIC_Y_0);
-        CLUSTER.createTopic(TOPIC_Z_0);
-        CLUSTER.createTopic(TOPIC_1_1);
-        CLUSTER.createTopic(TOPIC_2_1);
-        CLUSTER.createTopic(TOPIC_A_1);
-        CLUSTER.createTopic(TOPIC_C_1);
-        CLUSTER.createTopic(TOPIC_Y_1);
-        CLUSTER.createTopic(TOPIC_Z_1);
-        CLUSTER.createTopic(TOPIC_1_2);
-        CLUSTER.createTopic(TOPIC_2_2);
-        CLUSTER.createTopic(TOPIC_A_2);
-        CLUSTER.createTopic(TOPIC_C_2);
-        CLUSTER.createTopic(TOPIC_Y_2);
-        CLUSTER.createTopic(TOPIC_Z_2);
-        CLUSTER.createTopic(NOOP);
-        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC_0);
-        CLUSTER.createTopic(OUTPUT_TOPIC_1);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2);
+        CLUSTER.createTopics(
+            TOPIC_1_0,
+            TOPIC_2_0,
+            TOPIC_A_0,
+            TOPIC_C_0,
+            TOPIC_Y_0,
+            TOPIC_Z_0,
+            TOPIC_1_1,
+            TOPIC_2_1,
+            TOPIC_A_1,
+            TOPIC_C_1,
+            TOPIC_Y_1,
+            TOPIC_Z_1,
+            TOPIC_1_2,
+            TOPIC_2_2,
+            TOPIC_A_2,
+            TOPIC_C_2,
+            TOPIC_Y_2,
+            TOPIC_Z_2,
+            NOOP,
+            DEFAULT_OUTPUT_TOPIC,
+            OUTPUT_TOPIC_0,
+            OUTPUT_TOPIC_1,
+            OUTPUT_TOPIC_2);
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index c77cf3b..4426a5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -71,10 +71,7 @@ public class KTableKTableJoinIntegrationTest {
 
     @BeforeClass
     public static void beforeTest() throws Exception {
-        CLUSTER.createTopic(TABLE_1);
-        CLUSTER.createTopic(TABLE_2);
-        CLUSTER.createTopic(TABLE_3);
-        CLUSTER.createTopic(OUTPUT);
+        CLUSTER.createTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT);
 
         streamsConfig = new Properties();
         streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f2d0427..509a7fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -115,13 +115,10 @@ public class QueryableStateIntegrationTest {
         outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
         outputTopicThree = outputTopicThree + "-" + testNo;
         streamTwo = streamTwo + "-" + testNo;
-        CLUSTER.createTopic(streamOne);
-        CLUSTER.createTopic(streamConcurrent);
+        CLUSTER.createTopics(streamOne, streamConcurrent);
         CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
         CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
-        CLUSTER.createTopic(outputTopic);
-        CLUSTER.createTopic(outputTopicConcurrent);
-        CLUSTER.createTopic(outputTopicThree);
+        CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree);
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 011bca6..0b5c5e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -97,18 +97,18 @@ public class RegexSourceIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(TOPIC_1);
-        CLUSTER.createTopic(TOPIC_2);
-        CLUSTER.createTopic(TOPIC_A);
-        CLUSTER.createTopic(TOPIC_C);
-        CLUSTER.createTopic(TOPIC_Y);
-        CLUSTER.createTopic(TOPIC_Z);
-        CLUSTER.createTopic(FA_TOPIC);
-        CLUSTER.createTopic(FOO_TOPIC);
+        CLUSTER.createTopics(
+            TOPIC_1,
+            TOPIC_2,
+            TOPIC_A,
+            TOPIC_C,
+            TOPIC_Y,
+            TOPIC_Z,
+            FA_TOPIC,
+            FOO_TOPIC,
+            DEFAULT_OUTPUT_TOPIC);
         CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
         CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
-        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-
     }
 
     @Before
@@ -191,8 +191,7 @@ public class RegexSourceIntegrationTest {
 
         final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
-        CLUSTER.createTopic("TEST-TOPIC-A");
-        CLUSTER.createTopic("TEST-TOPIC-B");
+        CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
 
         final KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 626f38d..31a1465 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,7 +23,6 @@ import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -194,25 +193,7 @@ public class ResetIntegrationTest {
                 "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(INTERMEDIATE_USER_TOPIC);
 
-        CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC);
-        Set<String> allTopics;
-        ZkUtils zkUtils = null;
-        try {
-            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
-                    30000,
-                    30000,
-                    JaasUtils.isZkSecurityEnabled());
-
-            do {
-                Utils.sleep(100);
-                allTopics = new HashSet<>();
-                allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            } while (allTopics.contains(INTERMEDIATE_USER_TOPIC));
-        } finally {
-            if (zkUtils != null) {
-                zkUtils.close();
-            }
-        }
+        CLUSTER.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
     }
 
     @Test
@@ -281,33 +262,7 @@ public class ResetIntegrationTest {
     }
 
     private void prepareInputData() throws Exception {
-        try {
-            CLUSTER.deleteTopic(INPUT_TOPIC);
-        } catch (final UnknownTopicOrPartitionException e) {
-            // ignore
-        }
-        try {
-            CLUSTER.deleteTopic(OUTPUT_TOPIC);
-        } catch (final UnknownTopicOrPartitionException e) {
-            // ignore
-        }
-        try {
-            CLUSTER.deleteTopic(OUTPUT_TOPIC_2);
-        } catch (final UnknownTopicOrPartitionException e) {
-            // ignore
-        }
-        try {
-            CLUSTER.deleteTopic(OUTPUT_TOPIC_2_RERUN);
-        } catch (final UnknownTopicOrPartitionException e) {
-            // ignore
-        }
-
-        waitUntilUserTopicsAreDeleted();
-
-        CLUSTER.createTopic(INPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2);
-        CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
+        CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
 
@@ -406,34 +361,6 @@ public class ResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
-    private void waitUntilUserTopicsAreDeleted() {
-        ZkUtils zkUtils = null;
-        try {
-            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
-
-            while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())))) {
-                Utils.sleep(100);
-            }
-        } finally {
-            if (zkUtils != null) {
-                zkUtils.close();
-            }
-        }
-    }
-
-    private boolean userTopicExists(final Set<String> allTopics) {
-        final Set<String> expectedMissingTopics = new HashSet<>();
-        expectedMissingTopics.add(INPUT_TOPIC);
-        expectedMissingTopics.add(OUTPUT_TOPIC);
-        expectedMissingTopics.add(OUTPUT_TOPIC_2);
-        expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN);
-
-        return expectedMissingTopics.removeAll(allTopics);
-    }
-
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
         final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 6a0fc51..e738bc6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -19,16 +19,24 @@ package org.apache.kafka.streams.integration.utils;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
+import kafka.utils.ZkUtils;
 import kafka.zk.EmbeddedZookeeper;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
@@ -38,8 +46,11 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
     private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
     private static final int TOPIC_CREATION_TIMEOUT = 30000;
+    private static final int TOPIC_DELETION_TIMEOUT = 30000;
     private EmbeddedZookeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
+    private ZkUtils zkUtils = null;
+
     private final Properties brokerConfig;
     public final MockTime time;
 
@@ -77,6 +88,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         zookeeper = new EmbeddedZookeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
+        zkUtils = ZkUtils.apply(
+            zKConnectString(),
+            30000,
+            30000,
+            JaasUtils.isZkSecurityEnabled());
+
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
@@ -109,6 +126,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         for (final KafkaEmbedded broker : brokers) {
             broker.stop();
         }
+        zkUtils.close();
         zookeeper.shutdown();
     }
 
@@ -142,6 +160,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     }
 
     /**
+     * Create multiple Kafka topics each with 1 partition and a replication factor of 1.
+     *
+     * @param topics The name of the topics.
+     */
+    public void createTopics(final String... topics) throws InterruptedException {
+        for (final String topic : topics) {
+            createTopic(topic, 1, 1, new Properties());
+        }
+    }
+
+    /**
      * Create a Kafka topic with 1 partition and a replication factor of 1.
      *
      * @param topic The name of the topic.
@@ -181,8 +210,93 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT);
     }
 
-    public void deleteTopic(final String topic) {
-        brokers[0].deleteTopic(topic);
+    /**
+     * Deletes a topic returns immediately.
+     *
+     * @param topic the name of the topic
+     */
+    public void deleteTopic(final String topic) throws Exception {
+        deleteTopicsAndWait(-1L, topic);
+    }
+
+    /**
+     * Deletes a topic and blocks for max 30 sec until the topic got deleted.
+     *
+     * @param topic the name of the topic
+     */
+    public void deleteTopicAndWait(final String topic) throws Exception {
+        deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
+    }
+
+    /**
+     * Deletes a topic and blocks until the topic got deleted.
+     *
+     * @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0})
+     * @param topic the name of the topic
+     */
+    public void deleteTopicAndWait(final long timeoutMs, final String topic) throws Exception {
+        deleteTopicsAndWait(timeoutMs, topic);
+    }
+
+    /**
+     * Deletes multiple topics returns immediately.
+     *
+     * @param topics the name of the topics
+     */
+    public void deleteTopics(final String... topics) throws Exception {
+        deleteTopicsAndWait(-1, topics);
+    }
+
+    /**
+     * Deletes multiple topics and blocks for max 30 sec until all topics got deleted.
+     *
+     * @param topics the name of the topics
+     */
+    public void deleteTopicsAndWait(final String... topics) throws Exception {
+        deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
+    }
+
+    /**
+     * Deletes multiple topics and blocks until all topics got deleted.
+     *
+     * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
+     * @param topics the name of the topics
+     */
+    public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws Exception {
+        for (final String topic : topics) {
+            try {
+                brokers[0].deleteTopic(topic);
+            } catch (final UnknownTopicOrPartitionException e) { }
+        }
+
+        if (timeoutMs > 0) {
+            TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+        }
+    }
+
+    public void deleteAndRecreateTopics(final String... topics) throws Exception {
+        deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
+        createTopics(topics);
+    }
+
+    public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws Exception {
+        deleteTopicsAndWait(timeoutMs, topics);
+        createTopics(topics);
+    }
+
+    private final class TopicsDeletedCondition implements TestCondition {
+        final Set<String> deletedTopic = new HashSet<>();
+
+        private TopicsDeletedCondition(final String... topics) {
+            Collections.addAll(deletedTopic, topics);
+        }
+
+        @Override
+        public boolean conditionMet() {
+            final Set<String> allTopics = new HashSet<>();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            return !allTopics.removeAll(deletedTopic);
+        }
     }
 
     public List<KafkaServer> brokers() {