You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/28 01:55:54 UTC
kafka git commit: KAFKA-3612: Added structure for integration tests
Repository: kafka
Updated Branches:
refs/heads/trunk 8407dac6e -> 94aee2143
KAFKA-3612: Added structure for integration tests
Author: Eno Thereska <en...@gmail.com>
Reviewers: Ismael Juma, Damian Guy, Michael G. Noll, Guozhang Wang
Closes #1260 from enothereska/KAFKA-3612-integration-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94aee214
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94aee214
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94aee214
Branch: refs/heads/trunk
Commit: 94aee2143ed5290d27cdd4072c6ae9bb70a6ba30
Parents: 8407dac
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Apr 27 16:55:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 27 16:55:51 2016 -0700
----------------------------------------------------------------------
build.gradle | 2 +
checkstyle/import-control.xml | 11 ++
.../InternalTopicIntegrationTest.java | 169 +++++++++++++++++
.../utils/EmbeddedSingleNodeKafkaCluster.java | 128 +++++++++++++
.../integration/utils/IntegrationTestUtils.java | 157 +++++++++++++++
.../integration/utils/KafkaEmbedded.java | 189 +++++++++++++++++++
6 files changed, 656 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index dfa7c40..06c41d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -669,6 +669,8 @@ project(':streams') {
compile libs.jacksonDatabind // this dependency should be removed after KIP-4
testCompile project(':clients').sourceSets.test.output
+ testCompile project(':core')
+ testCompile project(':core').sourceSets.test.output
testCompile libs.junit
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e94698c..39d4ca3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -138,6 +138,17 @@
<allow pkg="com.fasterxml.jackson.databind" />
<allow pkg="org.apache.kafka.connect.json" />
</subpackage>
+
+ <subpackage name="integration">
+ <allow pkg="kafka.admin" />
+ <allow pkg="kafka.server" />
+ <allow pkg="kafka.utils" />
+ <allow pkg="kafka.zk" />
+ <allow pkg="kafka.log" />
+ <allow pkg="scala" />
+ <allow pkg="scala.collection" />
+ <allow pkg="org.I0Itec.zkclient" />
+ </subpackage>
<subpackage name="state">
<allow pkg="org.rocksdb" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
new file mode 100644
index 0000000..2a3e767
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import kafka.admin.AdminUtils;
+import kafka.log.LogConfig;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Map;
+
+/**
+ * Tests related to internal topics in streams
+ */
+public class InternalTopicIntegrationTest {
+ @ClassRule
+ public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster();
+ private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+ private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+ private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
+ private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+
+ @BeforeClass
+ public static void startKafkaCluster() throws Exception {
+ cluster.createTopic(DEFAULT_INPUT_TOPIC);
+ cluster.createTopic(DEFAULT_OUTPUT_TOPIC);
+ }
+
+ /**
+ * Validates that any state changelog topics are compacted
+ * @return true if topics have a valid config, false otherwise
+ */
+ private boolean isUsingCompactionForStateChangelogTopics() {
+ boolean valid = true;
+
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ cluster.zKConnectString(),
+ DEFAULT_ZK_SESSION_TIMEOUT_MS,
+ DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecure = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure);
+
+ Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
+ Iterator it = topicConfigs.iterator();
+ while (it.hasNext()) {
+ Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
+ String topic = topicConfig._1;
+ Properties prop = topicConfig._2;
+
+ // state changelogs should be compacted
+ if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
+ if (!prop.containsKey(LogConfig.CleanupPolicyProp()) ||
+ !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) {
+ valid = false;
+ break;
+ }
+ }
+ }
+ zkClient.close();
+ return valid;
+ }
+
+ @Test
+ public void shouldCompactTopicsForStateChangelogs() throws Exception {
+ List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
+
+ //
+ // Step 1: Configure and start a simple word count topology
+ //
+ final Serde<String> stringSerde = Serdes.String();
+ final Serde<Long> longSerde = Serdes.Long();
+
+ Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString());
+ streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
+
+ KStream<String, Long> wordCounts = textLines
+ .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> apply(String value) {
+ return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+ }
+ }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+ @Override
+ public KeyValue<String, String> apply(String key, String value) {
+ return new KeyValue<String, String>(value, value);
+ }
+ }).countByKey("Counts").toStream();
+
+ wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
+
+ // Remove any state from previous test runs
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+ KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ //
+ // Step 2: Produce some input data to the input topic.
+ //
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+
+ //
+ // Step 3: Verify the state changelog topics are compact
+ //
+ streams.close();
+ assertEquals(isUsingCompactionForStateChangelogTopics(), true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
new file mode 100644
index 0000000..34753ae
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import kafka.server.KafkaConfig$;
+import kafka.zk.EmbeddedZookeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
+ */
+public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
+
+ private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class);
+ private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
+ private EmbeddedZookeeper zookeeper = null;
+ private KafkaEmbedded broker = null;
+
+ /**
+ * Creates and starts a Kafka cluster.
+ */
+ public void start() throws IOException, InterruptedException {
+ Properties brokerConfig = new Properties();
+
+ log.debug("Initiating embedded Kafka cluster startup");
+ log.debug("Starting a ZooKeeper instance");
+ zookeeper = new EmbeddedZookeeper();
+ log.debug("ZooKeeper instance is running at {}", zKConnectString());
+ brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+ brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+
+ log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
+ broker = new KafkaEmbedded(brokerConfig);
+
+ log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
+ broker.brokerList(), broker.zookeeperConnect());
+ }
+
+ /**
+ * Stop the Kafka cluster.
+ */
+ public void stop() {
+ broker.stop();
+ zookeeper.shutdown();
+ }
+
+ /**
+ * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
+ * Example: `127.0.0.1:2181`.
+ *
+ * You can use this to e.g. tell Kafka brokers how to connect to this instance.
+ */
+ public String zKConnectString() {
+ return "localhost:" + zookeeper.port();
+ }
+
+ /**
+ * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`.
+ *
+ * You can use this to tell Kafka producers how to connect to this cluster.
+ */
+ public String bootstrapServers() {
+ return broker.brokerList();
+ }
+
+ protected void before() throws Throwable {
+ start();
+ }
+
+ protected void after() {
+ stop();
+ }
+
+ /**
+ * Create a Kafka topic with 1 partition and a replication factor of 1.
+ *
+ * @param topic The name of the topic.
+ */
+ public void createTopic(String topic) {
+ createTopic(topic, 1, 1, new Properties());
+ }
+
+ /**
+ * Create a Kafka topic with the given parameters.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ * @param replication The replication factor for (the partitions of) this topic.
+ */
+ public void createTopic(String topic, int partitions, int replication) {
+ createTopic(topic, partitions, replication, new Properties());
+ }
+
+ /**
+ * Create a Kafka topic with the given parameters.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ * @param replication The replication factor for (partitions of) this topic.
+ * @param topicConfig Additional topic-level configuration settings.
+ */
+ public void createTopic(String topic,
+ int partitions,
+ int replication,
+ Properties topicConfig) {
+ broker.createTopic(topic, partitions, replication, topicConfig);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
new file mode 100644
index 0000000..89fe0c4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Utility functions to make integration testing more convenient.
+ */
+public class IntegrationTestUtils {
+
+ private static final int UNLIMITED_MESSAGES = -1;
+
+ /**
+ * Returns up to `maxMessages` message-values from the topic.
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer configuration
+ * @param maxMessages Maximum number of messages to read via the consumer.
+ * @return The values retrieved via the consumer.
+ */
+ public static <K, V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
+ List<V> returnList = new ArrayList<>();
+ List<KeyValue<K, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+ for (KeyValue<K, V> kv : kvs) {
+ returnList.add(kv.value);
+ }
+ return returnList;
+ }
+
+ /**
+ * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is
+ * reached.
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer configuration
+ * @return The KeyValue elements retrieved via the consumer.
+ */
+ public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) {
+ return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
+ }
+
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer configuration
+ * @param maxMessages Maximum number of messages to read via the consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
+ consumer.subscribe(Collections.singletonList(topic));
+ int pollIntervalMs = 100;
+ int maxTotalPollTimeMs = 2000;
+ int totalPollTimeMs = 0;
+ List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+ while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
+ totalPollTimeMs += pollIntervalMs;
+ ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+ for (ConsumerRecord<K, V> record : records) {
+ consumedValues.add(new KeyValue<>(record.key(), record.value()));
+ }
+ }
+ consumer.close();
+ return consumedValues;
+ }
+
+ private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
+ return maxMessages <= 0 || messagesConsumed < maxMessages;
+ }
+
+ /**
+ * Removes local state stores. Useful to reset state in-between integration test runs.
+ *
+ * @param streamsConfiguration Streams configuration settings
+ */
+ public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
+ String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+ if (path != null) {
+ File node = Paths.get(path).normalize().toFile();
+ // Only purge state when it's under /tmp. This is a safety net to prevent accidentally
+ // deleting important local directory trees.
+ if (node.getAbsolutePath().startsWith("/tmp")) {
+ Utils.delete(new File(node.getAbsolutePath()));
+ }
+ }
+ }
+
+ /**
+ * @param topic Kafka topic to write the data records to
+ * @param records Data records to write to Kafka
+ * @param producerConfig Kafka producer configuration
+ * @param <K> Key type of the data records
+ * @param <V> Value type of the data records
+ */
+ public static <K, V> void produceKeyValuesSynchronously(
+ String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
+ throws ExecutionException, InterruptedException {
+ Producer<K, V> producer = new KafkaProducer<>(producerConfig);
+ for (KeyValue<K, V> record : records) {
+ Future<RecordMetadata> f = producer.send(
+ new ProducerRecord<>(topic, record.key, record.value));
+ f.get();
+ }
+ producer.flush();
+ producer.close();
+ }
+
+ public static <V> void produceValuesSynchronously(
+ String topic, Collection<V> records, Properties producerConfig)
+ throws ExecutionException, InterruptedException {
+ Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
+ for (V value : records) {
+ KeyValue<Object, V> kv = new KeyValue<>(null, value);
+ keyedRecords.add(kv);
+ }
+ produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
new file mode 100644
index 0000000..348b46b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.CoreUtils;
+import kafka.utils.SystemTime$;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import org.junit.rules.TemporaryFolder;
+/**
+ * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
+ * default.
+ *
+ * Requires a running ZooKeeper instance to connect to.
+ */
+public class KafkaEmbedded {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
+
+ private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
+ private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
+ private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+ private final Properties effectiveConfig;
+ private final File logDir;
+ public final TemporaryFolder tmpFolder;
+ private final KafkaServer kafka;
+
+ /**
+ * Creates and starts an embedded Kafka broker.
+ * @param config Broker configuration settings. Used to modify, for example, on which port the
+ * broker should listen to. Note that you cannot change the `log.dirs` setting
+ * currently.
+ */
+ public KafkaEmbedded(Properties config) throws IOException {
+ tmpFolder = new TemporaryFolder();
+ tmpFolder.create();
+ logDir = tmpFolder.newFolder();
+ effectiveConfig = effectiveConfigFrom(config);
+ boolean loggingEnabled = true;
+ KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
+ log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
+ logDir, zookeeperConnect());
+ kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$);
+ log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
+ brokerList(), zookeeperConnect());
+ }
+
+
+ /**
+ * Creates the configuration for starting the Kafka broker by merging default values with
+ * overwrites.
+ * @param initialConfig Broker configuration settings that override the default config.
+ * @return
+ * @throws IOException
+ */
+ private Properties effectiveConfigFrom(Properties initialConfig) throws IOException {
+ Properties effectiveConfig = new Properties();
+ effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
+ effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
+ effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
+ effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
+ effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
+ effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
+ effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
+
+ effectiveConfig.putAll(initialConfig);
+ effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
+ return effectiveConfig;
+ }
+
+ /**
+ * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`.
+ *
+ * You can use this to tell Kafka producers and consumers how to connect to this instance.
+ */
+ public String brokerList() {
+ return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT);
+ }
+
+
+ /**
+ * The ZooKeeper connection string aka `zookeeper.connect`.
+ */
+ public String zookeeperConnect() {
+ return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
+ }
+
+ /**
+ * Stop the broker.
+ */
+ public void stop() {
+ log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
+ brokerList(), zookeeperConnect());
+ kafka.shutdown();
+ kafka.awaitShutdown();
+ log.debug("Removing logs.dir at {} ...", logDir);
+ List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
+ CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
+ tmpFolder.delete();
+ log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
+ brokerList(), zookeeperConnect());
+ }
+
+ /**
+ * Create a Kafka topic with 1 partition and a replication factor of 1.
+ *
+ * @param topic The name of the topic.
+ */
+ public void createTopic(String topic) {
+ createTopic(topic, 1, 1, new Properties());
+ }
+
+ /**
+ * Create a Kafka topic with the given parameters.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ * @param replication The replication factor for (the partitions of) this topic.
+ */
+ public void createTopic(String topic, int partitions, int replication) {
+ createTopic(topic, partitions, replication, new Properties());
+ }
+
+ /**
+ * Create a Kafka topic with the given parameters.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ * @param replication The replication factor for (partitions of) this topic.
+ * @param topicConfig Additional topic-level configuration settings.
+ */
+ public void createTopic(String topic,
+ int partitions,
+ int replication,
+ Properties topicConfig) {
+ log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
+ topic, partitions, replication, topicConfig);
+
+ // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
+ // createTopic() will only seem to work (it will return without error). The topic will exist in
+ // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+ // topic.
+ ZkClient zkClient = new ZkClient(
+ zookeeperConnect(),
+ DEFAULT_ZK_SESSION_TIMEOUT_MS,
+ DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+ ZKStringSerializer$.MODULE$);
+ boolean isSecure = false;
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+ AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
+ zkClient.close();
+ }
+}
\ No newline at end of file