You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:45 UTC

[31/50] [abbrv] kafka git commit: KAFKA-3612: Added structure for integration tests

KAFKA-3612: Added structure for integration tests

Author: Eno Thereska <en...@gmail.com>

Reviewers: Ismael Juma, Damian Guy, Michael G. Noll, Guozhang Wang

Closes #1260 from enothereska/KAFKA-3612-integration-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94aee214
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94aee214
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94aee214

Branch: refs/heads/0.10.0
Commit: 94aee2143ed5290d27cdd4072c6ae9bb70a6ba30
Parents: 8407dac
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Apr 27 16:55:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 27 16:55:51 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   2 +
 checkstyle/import-control.xml                   |  11 ++
 .../InternalTopicIntegrationTest.java           | 169 +++++++++++++++++
 .../utils/EmbeddedSingleNodeKafkaCluster.java   | 128 +++++++++++++
 .../integration/utils/IntegrationTestUtils.java | 157 +++++++++++++++
 .../integration/utils/KafkaEmbedded.java        | 189 +++++++++++++++++++
 6 files changed, 656 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index dfa7c40..06c41d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -669,6 +669,8 @@ project(':streams') {
     compile libs.jacksonDatabind // this dependency should be removed after KIP-4
 
     testCompile project(':clients').sourceSets.test.output
+    testCompile project(':core')
+    testCompile project(':core').sourceSets.test.output
     testCompile libs.junit
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e94698c..39d4ca3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -138,6 +138,17 @@
       <allow pkg="com.fasterxml.jackson.databind" />
       <allow pkg="org.apache.kafka.connect.json" />
     </subpackage>
+    
+    <subpackage name="integration">
+      <allow pkg="kafka.admin" />
+      <allow pkg="kafka.server" />
+      <allow pkg="kafka.utils" />
+      <allow pkg="kafka.zk" />
+      <allow pkg="kafka.log" />
+      <allow pkg="scala" />
+      <allow pkg="scala.collection" />
+      <allow pkg="org.I0Itec.zkclient" />
+    </subpackage>
 
     <subpackage name="state">
       <allow pkg="org.rocksdb" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
new file mode 100644
index 0000000..2a3e767
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import kafka.admin.AdminUtils;
+import kafka.log.LogConfig;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Map;
+
+/**
+ * Tests related to internal topics in streams
+ */
+public class InternalTopicIntegrationTest {
+    @ClassRule
+    public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster();
+    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
+    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        cluster.createTopic(DEFAULT_INPUT_TOPIC);
+        cluster.createTopic(DEFAULT_OUTPUT_TOPIC);
+    }
+
+    /**
+     * Validates that any state changelog topics are compacted
+     * @return true if topics have a valid config, false otherwise
+     */
+    private boolean isUsingCompactionForStateChangelogTopics() {
+        boolean valid = true;
+
+        // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+        // createTopic() will only seem to work (it will return without error).  The topic will exist in
+        // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+        // topic.
+        ZkClient zkClient = new ZkClient(
+            cluster.zKConnectString(),
+            DEFAULT_ZK_SESSION_TIMEOUT_MS,
+            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+            ZKStringSerializer$.MODULE$);
+        boolean isSecure = false;
+        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure);
+
+        Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
+        Iterator it = topicConfigs.iterator();
+        while (it.hasNext()) {
+            Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next();
+            String topic = topicConfig._1;
+            Properties prop = topicConfig._2;
+
+            // state changelogs should be compacted
+            if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
+                if (!prop.containsKey(LogConfig.CleanupPolicyProp()) ||
+                    !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) {
+                    valid = false;
+                    break;
+                }
+            }
+        }
+        zkClient.close();
+        return valid;
+    }
+
+    @Test
+    public void shouldCompactTopicsForStateChangelogs() throws Exception {
+        List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
+
+        //
+        // Step 1: Configure and start a simple word count topology
+        //
+        final Serde<String> stringSerde = Serdes.String();
+        final Serde<Long> longSerde = Serdes.Long();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
+
+        KStream<String, Long> wordCounts = textLines
+            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                @Override
+                public Iterable<String> apply(String value) {
+                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                }
+            }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                @Override
+                public KeyValue<String, String> apply(String key, String value) {
+                    return new KeyValue<String, String>(value, value);
+                }
+            }).countByKey("Counts").toStream();
+
+        wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+
+        //
+        // Step 3: Verify the state changelog topics are compact
+        //
+        streams.close();
+        assertEquals(isUsingCompactionForStateChangelogTopics(), true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
new file mode 100644
index 0000000..34753ae
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import kafka.server.KafkaConfig$;
+import kafka.zk.EmbeddedZookeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
+ */
+public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class);
+    private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
+    private EmbeddedZookeeper zookeeper = null;
+    private KafkaEmbedded broker = null;
+
+    /**
+     * Creates and starts a Kafka cluster.
+     */
+    public void start() throws IOException, InterruptedException {
+        Properties brokerConfig = new Properties();
+
+        log.debug("Initiating embedded Kafka cluster startup");
+        log.debug("Starting a ZooKeeper instance");
+        zookeeper = new EmbeddedZookeeper();
+        log.debug("ZooKeeper instance is running at {}", zKConnectString());
+        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+
+        log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
+        broker = new KafkaEmbedded(brokerConfig);
+
+        log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
+            broker.brokerList(), broker.zookeeperConnect());
+    }
+
+    /**
+     * Stop the Kafka cluster.
+     */
+    public void stop() {
+        broker.stop();
+        zookeeper.shutdown();
+    }
+
+    /**
+     * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
+     * Example: `127.0.0.1:2181`.
+     *
+     * You can use this to e.g. tell Kafka brokers how to connect to this instance.
+     */
+    public String zKConnectString() {
+        return "localhost:" + zookeeper.port();
+    }
+
+    /**
+     * This cluster's `bootstrap.servers` value.  Example: `127.0.0.1:9092`.
+     *
+     * You can use this to tell Kafka producers how to connect to this cluster.
+     */
+    public String bootstrapServers() {
+        return broker.brokerList();
+    }
+
+    protected void before() throws Throwable {
+        start();
+    }
+
+    protected void after() {
+        stop();
+    }
+
+    /**
+     * Create a Kafka topic with 1 partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic) {
+        createTopic(topic, 1, 1, new Properties());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (the partitions of) this topic.
+     */
+    public void createTopic(String topic, int partitions, int replication) {
+        createTopic(topic, partitions, replication, new Properties());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (partitions of) this topic.
+     * @param topicConfig Additional topic-level configuration settings.
+     */
+    public void createTopic(String topic,
+                            int partitions,
+                            int replication,
+                            Properties topicConfig) {
+        broker.createTopic(topic, partitions, replication, topicConfig);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
new file mode 100644
index 0000000..89fe0c4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Utility functions to make integration testing more convenient.
+ */
+public class IntegrationTestUtils {
+
+    private static final int UNLIMITED_MESSAGES = -1;
+
+    /**
+     * Returns up to `maxMessages` message-values from the topic.
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumerConfig Kafka consumer configuration
+     * @param maxMessages    Maximum number of messages to read via the consumer.
+     * @return The values retrieved via the consumer.
+     */
+    public static <K, V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
+        List<V> returnList = new ArrayList<>();
+        List<KeyValue<K, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+        for (KeyValue<K, V> kv : kvs) {
+            returnList.add(kv.value);
+        }
+        return returnList;
+    }
+
+    /**
+     * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is
+     * reached.
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumerConfig Kafka consumer configuration
+     * @return The KeyValue elements retrieved via the consumer.
+     */
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) {
+        return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
+    }
+
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumerConfig Kafka consumer configuration
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
+        KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
+        consumer.subscribe(Collections.singletonList(topic));
+        int pollIntervalMs = 100;
+        int maxTotalPollTimeMs = 2000;
+        int totalPollTimeMs = 0;
+        List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+        while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
+            totalPollTimeMs += pollIntervalMs;
+            ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+            for (ConsumerRecord<K, V> record : records) {
+                consumedValues.add(new KeyValue<>(record.key(), record.value()));
+            }
+        }
+        consumer.close();
+        return consumedValues;
+    }
+
+    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
+        return maxMessages <= 0 || messagesConsumed < maxMessages;
+    }
+
+    /**
+     * Removes local state stores.  Useful to reset state in-between integration test runs.
+     *
+     * @param streamsConfiguration Streams configuration settings
+     */
+    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
+        String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        if (path != null) {
+            File node = Paths.get(path).normalize().toFile();
+            // Only purge state when it's under /tmp.  This is a safety net to prevent accidentally
+            // deleting important local directory trees.
+            if (node.getAbsolutePath().startsWith("/tmp")) {
+                Utils.delete(new File(node.getAbsolutePath()));
+            }
+        }
+    }
+
+    /**
+     * @param topic          Kafka topic to write the data records to
+     * @param records        Data records to write to Kafka
+     * @param producerConfig Kafka producer configuration
+     * @param <K>            Key type of the data records
+     * @param <V>            Value type of the data records
+     */
+    public static <K, V> void produceKeyValuesSynchronously(
+        String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
+        throws ExecutionException, InterruptedException {
+        Producer<K, V> producer = new KafkaProducer<>(producerConfig);
+        for (KeyValue<K, V> record : records) {
+            Future<RecordMetadata> f = producer.send(
+                new ProducerRecord<>(topic, record.key, record.value));
+            f.get();
+        }
+        producer.flush();
+        producer.close();
+    }
+
+    public static <V> void produceValuesSynchronously(
+        String topic, Collection<V> records, Properties producerConfig)
+        throws ExecutionException, InterruptedException {
+        Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
+        for (V value : records) {
+            KeyValue<Object, V> kv = new KeyValue<>(null, value);
+            keyedRecords.add(kv);
+        }
+        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
new file mode 100644
index 0000000..348b46b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.CoreUtils;
+import kafka.utils.SystemTime$;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import org.junit.rules.TemporaryFolder;
+/**
+ * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
+ * default.
+ *
+ * Requires a running ZooKeeper instance to connect to.
+ */
+public class KafkaEmbedded {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
+
+    private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
+    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
+    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+    private final Properties effectiveConfig;
+    private final File logDir;
+    public final TemporaryFolder tmpFolder;
+    private final KafkaServer kafka;
+
+    /**
+     * Creates and starts an embedded Kafka broker.
+     * @param config Broker configuration settings.  Used to modify, for example, on which port the
+     *               broker should listen to.  Note that you cannot change the `log.dirs` setting
+     *               currently.
+     */
+    public KafkaEmbedded(Properties config) throws IOException {
+        tmpFolder = new TemporaryFolder();
+        tmpFolder.create();
+        logDir = tmpFolder.newFolder();
+        effectiveConfig = effectiveConfigFrom(config);
+        boolean loggingEnabled = true;
+        KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
+        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
+            logDir, zookeeperConnect());
+        kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$);
+        log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
+            brokerList(), zookeeperConnect());
+    }
+
+
+    /**
+     * Creates the configuration for starting the Kafka broker by merging default values with
+     * overwrites.
+     * @param initialConfig Broker configuration settings that override the default config.
+     * @return
+     * @throws IOException
+     */
+    private Properties effectiveConfigFrom(Properties initialConfig) throws IOException {
+        Properties effectiveConfig = new Properties();
+        effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
+        effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
+        effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
+        effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
+        effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
+        effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
+        effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
+
+        effectiveConfig.putAll(initialConfig);
+        effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
+        return effectiveConfig;
+    }
+
+    /**
+     * This broker's `metadata.broker.list` value.  Example: `127.0.0.1:9092`.
+     *
+     * You can use this to tell Kafka producers and consumers how to connect to this instance.
+     */
+    public String brokerList() {
+        return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT);
+    }
+
+
+    /**
+     * The ZooKeeper connection string aka `zookeeper.connect`.
+     */
+    public String zookeeperConnect() {
+        return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
+    }
+
+    /**
+     * Stop the broker.
+     */
+    public void stop() {
+        log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
+            brokerList(), zookeeperConnect());
+        kafka.shutdown();
+        kafka.awaitShutdown();
+        log.debug("Removing logs.dir at {} ...", logDir);
+        List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
+        CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
+        tmpFolder.delete();
+        log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
+            brokerList(), zookeeperConnect());
+    }
+
+    /**
+     * Create a Kafka topic with 1 partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic) {
+        createTopic(topic, 1, 1, new Properties());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (the partitions of) this topic.
+     */
+    public void createTopic(String topic, int partitions, int replication) {
+        createTopic(topic, partitions, replication, new Properties());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (partitions of) this topic.
+     * @param topicConfig Additional topic-level configuration settings.
+     */
+    public void createTopic(String topic,
+                            int partitions,
+                            int replication,
+                            Properties topicConfig) {
+        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
+            topic, partitions, replication, topicConfig);
+
+        // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
+        // createTopic() will only seem to work (it will return without error).  The topic will exist in
+        // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
+        // topic.
+        ZkClient zkClient = new ZkClient(
+            zookeeperConnect(),
+            DEFAULT_ZK_SESSION_TIMEOUT_MS,
+            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+            ZKStringSerializer$.MODULE$);
+        boolean isSecure = false;
+        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+        AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
+        zkClient.close();
+    }
+}
\ No newline at end of file