You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/23 18:53:16 UTC
kafka git commit: MINOR: improve EmbeddedKafkaCluster test utility
for deleting topics
Repository: kafka
Updated Branches:
refs/heads/trunk 70ec4b1d9 -> 495877505
MINOR: improve EmbeddedKafkaCluster test utility for deleting topics
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck, Damian Guy, Eno Thereska, Guozhang Wang
Closes #3104 from mjsax/minor-improve-embedded-kafka-cluster
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49587750
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49587750
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49587750
Branch: refs/heads/trunk
Commit: 49587750570a2aaf332eb73f5aad373196f984ce
Parents: 70ec4b1
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue May 23 11:53:13 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 23 11:53:13 2017 -0700
----------------------------------------------------------------------
.../integration/FanoutIntegrationTest.java | 4 +-
.../GlobalKTableIntegrationTest.java | 3 +-
.../InternalTopicIntegrationTest.java | 3 +-
.../integration/JoinIntegrationTest.java | 41 +------
.../KStreamAggregationIntegrationTest.java | 3 +-
.../KStreamKTableJoinIntegrationTest.java | 4 +-
...eamsFineGrainedAutoResetIntegrationTest.java | 47 ++++----
.../KTableKTableJoinIntegrationTest.java | 5 +-
.../QueryableStateIntegrationTest.java | 7 +-
.../integration/RegexSourceIntegrationTest.java | 23 ++--
.../integration/ResetIntegrationTest.java | 77 +-----------
.../integration/utils/EmbeddedKafkaCluster.java | 118 ++++++++++++++++++-
12 files changed, 163 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 8593317..f1f09a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -79,9 +79,7 @@ public class FanoutIntegrationTest {
@BeforeClass
public static void startKafkaCluster() throws Exception {
- CLUSTER.createTopic(INPUT_TOPIC_A);
- CLUSTER.createTopic(OUTPUT_TOPIC_B);
- CLUSTER.createTopic(OUTPUT_TOPIC_C);
+ CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index e5ed3d8..869c255 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -222,8 +222,7 @@ public class GlobalKTableIntegrationTest {
inputStream = "input-stream-" + testNo;
inputTable = "input-table-" + testNo;
globalOne = "globalOne-" + testNo;
- CLUSTER.createTopic(inputStream);
- CLUSTER.createTopic(inputTable);
+ CLUSTER.createTopics(inputStream, inputTable);
CLUSTER.createTopic(globalOne, 2, 1);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 29cdc1b..3658d10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -77,8 +77,7 @@ public class InternalTopicIntegrationTest {
@BeforeClass
public static void startKafkaCluster() throws Exception {
- CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
- CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
+ CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC);
}
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 040c784..b69e58b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.integration;
-import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -36,10 +34,8 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -48,11 +44,9 @@ import org.junit.experimental.categories.Category;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -65,8 +59,6 @@ public class JoinIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- private static ZkUtils zkUtils = null;
-
private static final String APP_ID = "join-integration-test";
private static final String INPUT_TOPIC_1 = "inputTopicLeft";
private static final String INPUT_TOPIC_2 = "inputTopicRight";
@@ -107,8 +99,6 @@ public class JoinIntegrationTest {
}
};
- private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();
-
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -131,25 +121,11 @@ public class JoinIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-
- zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled());
- }
-
- @AfterClass
- public static void release() {
- if (zkUtils != null) {
- zkUtils.close();
- }
}
@Before
public void prepareTopology() throws Exception {
- CLUSTER.createTopic(INPUT_TOPIC_1);
- CLUSTER.createTopic(INPUT_TOPIC_2);
- CLUSTER.createTopic(OUTPUT_TOPIC);
+ CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
builder = new KStreamBuilder();
leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
@@ -160,11 +136,7 @@ public class JoinIntegrationTest {
@After
public void cleanup() throws Exception {
- CLUSTER.deleteTopic(INPUT_TOPIC_1);
- CLUSTER.deleteTopic(INPUT_TOPIC_2);
- CLUSTER.deleteTopic(OUTPUT_TOPIC);
-
- TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds.");
+ CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
}
private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
@@ -414,15 +386,6 @@ public class JoinIntegrationTest {
runTest(expectedResult);
}
- private final class TopicsGotDeletedCondition implements TestCondition {
- @Override
- public boolean conditionMet() {
- final Set<String> allTopics = new HashSet<>();
- allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
- return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC);
- }
- }
-
private final class Input<V> {
String topic;
KeyValue<Long, V> record;
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 0722210..0d5472c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -662,8 +662,7 @@ public class KStreamAggregationIntegrationTest {
outputTopic = "output-" + testNo;
userSessionsStream = userSessionsStream + "-" + testNo;
CLUSTER.createTopic(streamOneInput, 3, 1);
- CLUSTER.createTopic(userSessionsStream);
- CLUSTER.createTopic(outputTopic);
+ CLUSTER.createTopics(userSessionsStream, outputTopic);
}
private void startStreams() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 9b5b428..5a17566 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -80,9 +80,7 @@ public class KStreamKTableJoinIntegrationTest {
userRegionsTopic = "user-regions-" + testNo;
userRegionsStoreName = "user-regions-store-name-" + testNo;
outputTopic = "output-topic-" + testNo;
- CLUSTER.createTopic(userClicksTopic);
- CLUSTER.createTopic(userRegionsTopic);
- CLUSTER.createTopic(outputTopic);
+ CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic);
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index cff5f43..a868839 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -104,29 +104,30 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
@BeforeClass
public static void startKafkaCluster() throws Exception {
- CLUSTER.createTopic(TOPIC_1_0);
- CLUSTER.createTopic(TOPIC_2_0);
- CLUSTER.createTopic(TOPIC_A_0);
- CLUSTER.createTopic(TOPIC_C_0);
- CLUSTER.createTopic(TOPIC_Y_0);
- CLUSTER.createTopic(TOPIC_Z_0);
- CLUSTER.createTopic(TOPIC_1_1);
- CLUSTER.createTopic(TOPIC_2_1);
- CLUSTER.createTopic(TOPIC_A_1);
- CLUSTER.createTopic(TOPIC_C_1);
- CLUSTER.createTopic(TOPIC_Y_1);
- CLUSTER.createTopic(TOPIC_Z_1);
- CLUSTER.createTopic(TOPIC_1_2);
- CLUSTER.createTopic(TOPIC_2_2);
- CLUSTER.createTopic(TOPIC_A_2);
- CLUSTER.createTopic(TOPIC_C_2);
- CLUSTER.createTopic(TOPIC_Y_2);
- CLUSTER.createTopic(TOPIC_Z_2);
- CLUSTER.createTopic(NOOP);
- CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
- CLUSTER.createTopic(OUTPUT_TOPIC_0);
- CLUSTER.createTopic(OUTPUT_TOPIC_1);
- CLUSTER.createTopic(OUTPUT_TOPIC_2);
+ CLUSTER.createTopics(
+ TOPIC_1_0,
+ TOPIC_2_0,
+ TOPIC_A_0,
+ TOPIC_C_0,
+ TOPIC_Y_0,
+ TOPIC_Z_0,
+ TOPIC_1_1,
+ TOPIC_2_1,
+ TOPIC_A_1,
+ TOPIC_C_1,
+ TOPIC_Y_1,
+ TOPIC_Z_1,
+ TOPIC_1_2,
+ TOPIC_2_2,
+ TOPIC_A_2,
+ TOPIC_C_2,
+ TOPIC_Y_2,
+ TOPIC_Z_2,
+ NOOP,
+ DEFAULT_OUTPUT_TOPIC,
+ OUTPUT_TOPIC_0,
+ OUTPUT_TOPIC_1,
+ OUTPUT_TOPIC_2);
}
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index c77cf3b..4426a5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -71,10 +71,7 @@ public class KTableKTableJoinIntegrationTest {
@BeforeClass
public static void beforeTest() throws Exception {
- CLUSTER.createTopic(TABLE_1);
- CLUSTER.createTopic(TABLE_2);
- CLUSTER.createTopic(TABLE_3);
- CLUSTER.createTopic(OUTPUT);
+ CLUSTER.createTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT);
streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f2d0427..509a7fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -115,13 +115,10 @@ public class QueryableStateIntegrationTest {
outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
outputTopicThree = outputTopicThree + "-" + testNo;
streamTwo = streamTwo + "-" + testNo;
- CLUSTER.createTopic(streamOne);
- CLUSTER.createTopic(streamConcurrent);
+ CLUSTER.createTopics(streamOne, streamConcurrent);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
- CLUSTER.createTopic(outputTopic);
- CLUSTER.createTopic(outputTopicConcurrent);
- CLUSTER.createTopic(outputTopicThree);
+ CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree);
}
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 011bca6..0b5c5e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -97,18 +97,18 @@ public class RegexSourceIntegrationTest {
@BeforeClass
public static void startKafkaCluster() throws Exception {
- CLUSTER.createTopic(TOPIC_1);
- CLUSTER.createTopic(TOPIC_2);
- CLUSTER.createTopic(TOPIC_A);
- CLUSTER.createTopic(TOPIC_C);
- CLUSTER.createTopic(TOPIC_Y);
- CLUSTER.createTopic(TOPIC_Z);
- CLUSTER.createTopic(FA_TOPIC);
- CLUSTER.createTopic(FOO_TOPIC);
+ CLUSTER.createTopics(
+ TOPIC_1,
+ TOPIC_2,
+ TOPIC_A,
+ TOPIC_C,
+ TOPIC_Y,
+ TOPIC_Z,
+ FA_TOPIC,
+ FOO_TOPIC,
+ DEFAULT_OUTPUT_TOPIC);
CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
- CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-
}
@Before
@@ -191,8 +191,7 @@ public class RegexSourceIntegrationTest {
final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
- CLUSTER.createTopic("TEST-TOPIC-A");
- CLUSTER.createTopic("TEST-TOPIC-B");
+ CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
final KStreamBuilder builder = new KStreamBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 626f38d..31a1465 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,7 +23,6 @@ import kafka.utils.MockTime;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -194,25 +193,7 @@ public class ResetIntegrationTest {
"Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
cleanGlobal(INTERMEDIATE_USER_TOPIC);
- CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC);
- Set<String> allTopics;
- ZkUtils zkUtils = null;
- try {
- zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled());
-
- do {
- Utils.sleep(100);
- allTopics = new HashSet<>();
- allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
- } while (allTopics.contains(INTERMEDIATE_USER_TOPIC));
- } finally {
- if (zkUtils != null) {
- zkUtils.close();
- }
- }
+ CLUSTER.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
}
@Test
@@ -281,33 +262,7 @@ public class ResetIntegrationTest {
}
private void prepareInputData() throws Exception {
- try {
- CLUSTER.deleteTopic(INPUT_TOPIC);
- } catch (final UnknownTopicOrPartitionException e) {
- // ignore
- }
- try {
- CLUSTER.deleteTopic(OUTPUT_TOPIC);
- } catch (final UnknownTopicOrPartitionException e) {
- // ignore
- }
- try {
- CLUSTER.deleteTopic(OUTPUT_TOPIC_2);
- } catch (final UnknownTopicOrPartitionException e) {
- // ignore
- }
- try {
- CLUSTER.deleteTopic(OUTPUT_TOPIC_2_RERUN);
- } catch (final UnknownTopicOrPartitionException e) {
- // ignore
- }
-
- waitUntilUserTopicsAreDeleted();
-
- CLUSTER.createTopic(INPUT_TOPIC);
- CLUSTER.createTopic(OUTPUT_TOPIC);
- CLUSTER.createTopic(OUTPUT_TOPIC_2);
- CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN);
+ CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
@@ -406,34 +361,6 @@ public class ResetIntegrationTest {
Assert.assertEquals(0, exitCode);
}
- private void waitUntilUserTopicsAreDeleted() {
- ZkUtils zkUtils = null;
- try {
- zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled());
-
- while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())))) {
- Utils.sleep(100);
- }
- } finally {
- if (zkUtils != null) {
- zkUtils.close();
- }
- }
- }
-
- private boolean userTopicExists(final Set<String> allTopics) {
- final Set<String> expectedMissingTopics = new HashSet<>();
- expectedMissingTopics.add(INPUT_TOPIC);
- expectedMissingTopics.add(OUTPUT_TOPIC);
- expectedMissingTopics.add(OUTPUT_TOPIC_2);
- expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN);
-
- return expectedMissingTopics.removeAll(allTopics);
- }
-
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
http://git-wip-us.apache.org/repos/asf/kafka/blob/49587750/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 6a0fc51..e738bc6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -19,16 +19,24 @@ package org.apache.kafka.streams.integration.utils;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
+import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
@@ -38,8 +46,11 @@ public class EmbeddedKafkaCluster extends ExternalResource {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
private static final int TOPIC_CREATION_TIMEOUT = 30000;
+ private static final int TOPIC_DELETION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
+ private ZkUtils zkUtils = null;
+
private final Properties brokerConfig;
public final MockTime time;
@@ -77,6 +88,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
zookeeper = new EmbeddedZookeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
+ zkUtils = ZkUtils.apply(
+ zKConnectString(),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled());
+
brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
@@ -109,6 +126,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
for (final KafkaEmbedded broker : brokers) {
broker.stop();
}
+ zkUtils.close();
zookeeper.shutdown();
}
@@ -142,6 +160,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
}
/**
+ * Create multiple Kafka topics each with 1 partition and a replication factor of 1.
+ *
+ * @param topics The name of the topics.
+ */
+ public void createTopics(final String... topics) throws InterruptedException {
+ for (final String topic : topics) {
+ createTopic(topic, 1, 1, new Properties());
+ }
+ }
+
+ /**
* Create a Kafka topic with 1 partition and a replication factor of 1.
*
* @param topic The name of the topic.
@@ -181,8 +210,93 @@ public class EmbeddedKafkaCluster extends ExternalResource {
IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT);
}
- public void deleteTopic(final String topic) {
- brokers[0].deleteTopic(topic);
+ /**
+ * Deletes a topic returns immediately.
+ *
+ * @param topic the name of the topic
+ */
+ public void deleteTopic(final String topic) throws Exception {
+ deleteTopicsAndWait(-1L, topic);
+ }
+
+ /**
+ * Deletes a topic and blocks for max 30 sec until the topic got deleted.
+ *
+ * @param topic the name of the topic
+ */
+ public void deleteTopicAndWait(final String topic) throws Exception {
+ deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
+ }
+
+ /**
+ * Deletes a topic and blocks until the topic got deleted.
+ *
+ * @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0})
+ * @param topic the name of the topic
+ */
+ public void deleteTopicAndWait(final long timeoutMs, final String topic) throws Exception {
+ deleteTopicsAndWait(timeoutMs, topic);
+ }
+
+ /**
+ * Deletes multiple topics returns immediately.
+ *
+ * @param topics the name of the topics
+ */
+ public void deleteTopics(final String... topics) throws Exception {
+ deleteTopicsAndWait(-1, topics);
+ }
+
+ /**
+ * Deletes multiple topics and blocks for max 30 sec until all topics got deleted.
+ *
+ * @param topics the name of the topics
+ */
+ public void deleteTopicsAndWait(final String... topics) throws Exception {
+ deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
+ }
+
+ /**
+ * Deletes multiple topics and blocks until all topics got deleted.
+ *
+ * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
+ * @param topics the name of the topics
+ */
+ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws Exception {
+ for (final String topic : topics) {
+ try {
+ brokers[0].deleteTopic(topic);
+ } catch (final UnknownTopicOrPartitionException e) { }
+ }
+
+ if (timeoutMs > 0) {
+ TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+ }
+ }
+
+ public void deleteAndRecreateTopics(final String... topics) throws Exception {
+ deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
+ createTopics(topics);
+ }
+
+ public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws Exception {
+ deleteTopicsAndWait(timeoutMs, topics);
+ createTopics(topics);
+ }
+
+ private final class TopicsDeletedCondition implements TestCondition {
+ final Set<String> deletedTopic = new HashSet<>();
+
+ private TopicsDeletedCondition(final String... topics) {
+ Collections.addAll(deletedTopic, topics);
+ }
+
+ @Override
+ public boolean conditionMet() {
+ final Set<String> allTopics = new HashSet<>();
+ allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+ return !allTopics.removeAll(deletedTopic);
+ }
}
public List<KafkaServer> brokers() {