You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/30 01:21:56 UTC

[kafka] branch trunk updated: KAFKA-3625: Add public test utils for Kafka Streams (#4402)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a78f66a  KAFKA-3625: Add public test utils for Kafka Streams (#4402)
a78f66a is described below

commit a78f66a5ae7865418ee482c535e883598e1ec51f
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Jan 29 17:21:48 2018 -0800

    KAFKA-3625: Add public test utils for Kafka Streams (#4402)
    
    * KAFKA-3625: Add public test utils for Kafka Streams
     - add new artifact test-utils
     - add TopologyTestDriver
     - add MockTime, TestRecord, add TestRecordFactory
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>
---
 build.gradle                                       |  36 +-
 checkstyle/suppressions.xml                        |   8 +-
 .../kafka/clients/producer/ProducerRecord.java     |   2 +-
 docs/streams/upgrade-guide.html                    |   6 +
 settings.gradle                                    |   2 +-
 .../kafka/streams/processor/ProcessorContext.java  |   4 +-
 .../internals/InternalTopologyBuilder.java         |   6 +
 .../streams/processor/internals/StreamTask.java    |   4 +-
 .../state/internals/QueryableStoreProvider.java    |   2 +-
 .../state/internals/WrappingStoreProvider.java     |   2 +-
 .../kafka/streams/InternalTopologyAccessor.java    |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java    |   6 +-
 .../apache/kafka/streams/TopologyTestDriver.java   | 624 +++++++++++++++++++
 .../kafka/streams/test/ConsumerRecordFactory.java  | 415 ++++++++++++
 .../apache/kafka/streams/test/OutputVerifier.java  | 241 +++++++
 .../org/apache/kafka/streams/MockTimeTest.java     |  56 ++
 .../kafka/streams/TopologyTestDriverTest.java      | 692 +++++++++++++++++++++
 .../streams/test/ConsumerRecordFactoryTest.java    | 265 ++++++++
 .../kafka/streams/test/OutputVerifierTest.java     | 584 +++++++++++++++++
 19 files changed, 2943 insertions(+), 14 deletions(-)

diff --git a/build.gradle b/build.gradle
index 725cf0b..aedd943 100644
--- a/build.gradle
+++ b/build.gradle
@@ -513,7 +513,7 @@ for ( sv in availableScalaVersions ) {
 }
 
 def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:test-utils', 'streams:examples'] + connectPkgs
 
 /** Create one task per default Scala version */
 def withDefScalaVersions(taskName) {
@@ -729,6 +729,8 @@ project(':core') {
     from(project(':connect:file').configurations.runtime) { into("libs/") }
     from(project(':streams').jar) { into("libs/") }
     from(project(':streams').configurations.runtime) { into("libs/") }
+    from(project(':streams:test-utils').jar) { into("libs/") }
+    from(project(':streams:test-utils').configurations.runtime) { into("libs/") }
     from(project(':streams:examples').jar) { into("libs/") }
     from(project(':streams:examples').configurations.runtime) { into("libs/") }
     duplicatesStrategy 'exclude'
@@ -954,6 +956,38 @@ project(':streams') {
   }
 }
 
+project(':streams:test-utils') {
+  archivesBaseName = "kafka-streams-test-utils"
+
+  dependencies {
+    compile project(':streams')
+    compile project(':clients')
+
+    testCompile project(':clients').sourceSets.test.output
+    testCompile libs.junit
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/streams/test/**"
+    exclude "**/internals/**"
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.runtime) {
+      exclude('kafka-streams*')
+    }
+    into "$buildDir/dependant-libs-${versions.scala}"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn 'copyDependantLibs'
+  }
+
+}
+
 project(':streams:examples') {
   archivesBaseName = "kafka-streams-examples"
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 664a3f6..c6c2d23 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -153,7 +153,7 @@
     <suppress checks="NPathComplexity"
               files="StreamThread.java"/>
 
-    <!-- streams tests -->
+    <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
               files="(StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
 
@@ -185,6 +185,12 @@
     <suppress checks="NPathComplexity"
               files="KStreamKStreamLeftJoinTest.java"/>
 
+    <!-- Streams Test-Utils -->
+    <suppress checks="ClassFanOutComplexity"
+              files="TopologyTestDriver.java"/>
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="TopologyTestDriver.java"/>
+
     <!-- Tools -->
     <suppress checks="ClassDataAbstractionCoupling"
               files="VerifiableConsumer.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index c8ff00b..5d6bb3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -104,7 +104,7 @@ public class ProducerRecord<K, V> {
      * @param value The record contents
      * @param headers The headers that will be included in the record
      */
-    public ProducerRecord(String topic, Integer partition, K key, V value,  Iterable<Header> headers) {
+    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
         this(topic, partition, null, key, value, headers);
     }
     
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 11c3a7e..1d5a342 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -71,6 +71,12 @@
 
 
     <p>
+	There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
+	You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.
+	For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
+    </p>
+
+    <p>
         The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier">KIP-220</a>
         enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients.
         You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code>
diff --git a/settings.gradle b/settings.gradle
index f0fdf07..e599d01 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'log4j-appender',
         'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 385d641..d4393aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -109,12 +109,12 @@ public interface ProcessorContext {
      *   by how long an iteration of the processing loop takes to complete</li>
      * </ul>
      *
-     * @param interval the time interval between punctuations
+     * @param intervalMs the time interval between punctuations in milliseconds
      * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
      * @param callback a function consuming timestamps representing the current stream or system time
      * @return a handle allowing cancellation of the punctuation schedule established by this method
      */
-    Cancellable schedule(long interval, PunctuationType type, Punctuator callback);
+    Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 2402d2e..7d4b592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1019,6 +1019,12 @@ public class InternalTopologyBuilder {
         return Collections.unmodifiableMap(globalStateStores);
     }
 
+    public Set<String> allStateStoreName() {
+        final Set<String> allNames = new HashSet<>(stateFactories.keySet());
+        allNames.addAll(globalStateStores.keySet());
+        return Collections.unmodifiableSet(allNames);
+    }
+
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e293f25..11b2f89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -607,7 +607,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Note, this is only called in the presence of new records
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    boolean maybePunctuateStreamTime() {
+    public boolean maybePunctuateStreamTime() {
         final long timestamp = partitionGroup.timestamp();
 
         // if the timestamp is not known yet, meaning there is not enough data accumulated
@@ -625,7 +625,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Note, this is called irrespective of the presence of new records
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    boolean maybePunctuateSystemTime() {
+    public boolean maybePunctuateSystemTime() {
         final long timestamp = time.milliseconds();
 
         return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index b439ad5..971dcd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -57,7 +57,7 @@ public class QueryableStoreProvider {
             allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
         }
         if (allStores.isEmpty()) {
-            throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
+            throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
         }
         return queryableStoreType.create(
                 new WrappingStoreProvider(storeProviders),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index fdc01c6..36d0173 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -50,7 +50,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
             allStores.addAll(stores);
         }
         if (allStores.isEmpty()) {
-            throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
+            throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
         }
         return allStores;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java b/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
index a6144f2..c3c4504 100644
--- a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
+++ b/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
  */
 public class InternalTopologyAccessor {
 
-    public static InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
+    public static InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
         return topology.internalTopologyBuilder;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index e5f15bc..82c39ee 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -262,9 +262,9 @@ public class ProcessorTopologyTestDriver {
      * @param timestamp the raw message timestamp
      */
     public void process(final String topicName,
-                         final byte[] key,
-                         final byte[] value,
-                         final long timestamp) {
+                        final byte[] key,
+                        final byte[] value,
+                        final long timestamp) {
 
         final TopicPartition tp = partitionsByTopic.get(topicName);
         if (tp != null) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
new file mode 100644
index 0000000..6168640
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class makes it easier to write tests to verify the behavior of topologies created with {@link Topology} or
+ * {@link StreamsBuilder}.
+ * You can test simple topologies that have a single processor, or very complex topologies that have multiple sources,
+ * processors, sinks, or sub-topologies.
+ * Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.
+ * <p>
+ * Using the {@code TopologyTestDriver} in tests is easy: simply instantiate the driver and provide a {@link Topology}
+ * (cf. {@link StreamsBuilder#build()}) and {@link Properties configs}, use the driver to supply an
+ * input message to the topology, and then use the driver to read and verify any messages output by the topology.
+ * <p>
+ * Although the driver doesn't use a real Kafka broker, it does simulate Kafka {@link Consumer consumers} and
+ * {@link Producer producers} that read and write raw {@code byte[]} messages.
+ * You can either deal with messages that have {@code byte[]} keys and values or you use {@link ConsumerRecordFactory}
+ * and {@link OutputVerifier} that work with regular Java objects instead of raw bytes.
+ *
+ * <h2>Driver setup</h2>
+ * In order to create a {@code TopologyTestDriver} instance, you need a {@link Topology} and a {@link Properties config}.
+ * The configuration needs to be representative of what you'd supply to the real topology, so that means including
+ * several key properties (cf. {@link StreamsConfig}).
+ * For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is
+ * needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
+ *
+ * <pre>{@code
+ * Properties props = new Properties();
+ * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ * props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ * Topology topology = ...
+ * TopologyTestDriver driver = new TopologyTestDriver(topology, props);
+ * }</pre>
+ *
+ * <h2>Processing messages</h2>
+ * <p>
+ * Your test can supply new input records on any of the topics that the topology's sources consume.
+ * This test driver simulates single-partitioned input topics.
+ * Here's an example of an input message on the topic named {@code input-topic}:
+ *
+ * <pre>
+ * ConsumerRecordFactory factory = new ConsumerRecordFactory(strSerializer, strSerializer);
+ * driver.pipeInput(factory.create("input-topic","key1", "value1"));
+ * </pre>
+ *
+ * When {@code #pipeInput()} is called, the driver passes the input message through to the appropriate source that
+ * consumes the named topic, and will invoke the processor(s) downstream of the source.
+ * If your topology's processors forward messages to sinks, your test can then consume these output messages to verify
+ * they match the expected outcome.
+ * For example, if our topology should have generated 2 messages on {@code output-topic-1} and 1 message on
+ * {@code output-topic-2}, then our test can obtain these messages using the
+ * {@link #readOutput(String, Deserializer, Deserializer)} method:
+ *
+ * <pre>{@code
+ * ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
+ * ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
+ * ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
+ * }</pre>
+ *
+ * Again, our example topology generates messages with string keys and values, so we supply our string deserializer
+ * instance for use on both the keys and values. Your test logic can then verify whether these output records are
+ * correct.
+ * Note, that calling {@link ProducerRecord#equals(Object)} compares all attributes including key, value, timestamp,
+ * topic, partition, and headers.
+ * If you only want to compare key and value (and maybe timestamp), using {@link OutputVerifier} instead of
+ * {@link ProducerRecord#equals(Object)} can simplify your code as you can ignore attributes you are not interested in.
+ * <p>
+ * Note, that calling {@code pipeInput()} will also trigger {@link PunctuationType#STREAM_TIME event-time} base
+ * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) punctuation} callbacks.
+ * However, you won't trigger {@link PunctuationType#WALL_CLOCK_TIME wall-clock} type punctuations that you must
+ * trigger manually via {@link #advanceWallClockTime(long)}.
+ * <p>
+ * Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
+ * {@link org.apache.kafka.streams.processor.Processor processors}.
+ *
+ * <h2>Processor state</h2>
+ * <p>
+ * Some processors use Kafka {@link StateStore state storage}, so this driver class provides the generic
+ * {@link #getStateStore(String)} as well as store-type specific methods so that your tests can check the underlying
+ * state store(s) used by your topology's processors.
+ * In our previous example, after we supplied a single input message and checked the three output messages, our test
+ * could also check the key value store to verify the processor correctly added, removed, or updated internal state.
+ * Or, our test might have pre-populated some state <em>before</em> submitting the input message, and verified afterward
+ * that the processor(s) correctly updated the state.
+ *
+ * @see ConsumerRecordFactory
+ * @see OutputVerifier
+ */
+@InterfaceStability.Evolving
+public class TopologyTestDriver {
+
+    private final Time mockTime;
+    private final InternalTopologyBuilder internalTopologyBuilder;
+
+    private final static int PARTITION_ID = 0;
+    private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
+    private StreamTask task;
+    private GlobalStateUpdateTask globalStateTask;
+
+    private final ProcessorTopology processorTopology;
+    private final MockProducer<byte[], byte[]> producer;
+
+    private final Set<String> internalTopics = new HashSet<>();
+    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
+    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
+    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
+
+    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
+
+    /**
+     * Create a new test diver instance.
+     * Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}.
+     *
+     * @param topology the topology to be tested
+     * @param config the configuration for the topology
+     */
+    public TopologyTestDriver(final Topology topology,
+                              final Properties config) {
+        this(topology, config, System.currentTimeMillis());
+    }
+    /**
+     * Create a new test diver instance.
+     *
+     * @param topology the topology to be tested
+     * @param config the configuration for the topology
+     * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
+     */
+    public TopologyTestDriver(final Topology topology,
+                              final Properties config,
+                              final long initialWallClockTimeMs) {
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        mockTime = new MockTime(initialWallClockTimeMs);
+
+        internalTopologyBuilder = topology.internalTopologyBuilder;
+        internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+
+        processorTopology = internalTopologyBuilder.build(null);
+        final ProcessorTopology globalTopology  = internalTopologyBuilder.buildGlobalStateTopology();
+
+        final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
+        producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
+            }
+        };
+
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final StateDirectory stateDirectory = new StateDirectory(streamsConfig, mockTime);
+        final StreamsMetrics streamsMetrics = new StreamsMetricsImpl(
+            new Metrics(),
+            "topology-test-driver-stream-metrics",
+            Collections.<String, String>emptyMap());
+        final ThreadCache cache = new ThreadCache(
+            new LogContext("topology-test-driver "),
+            Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+            streamsMetrics);
+        final StateRestoreListener stateRestoreListener = new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {}
+
+            @Override
+            public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {}
+
+            @Override
+            public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {}
+        };
+
+        for (final InternalTopologyBuilder.TopicsInfo topicsInfo : internalTopologyBuilder.topicGroups().values()) {
+            internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+        }
+
+        for (final String topic : processorTopology.sourceTopics()) {
+            final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
+            partitionsByTopic.put(topic, tp);
+            offsetsByTopicPartition.put(tp, new AtomicLong());
+        }
+        consumer.assign(partitionsByTopic.values());
+
+        if (globalTopology != null) {
+            for (final String topicName : globalTopology.sourceTopics()) {
+                final TopicPartition partition = new TopicPartition(topicName, 0);
+                globalPartitionsByTopic.put(topicName, partition);
+                offsetsByTopicPartition.put(partition, new AtomicLong());
+                consumer.updatePartitions(topicName, Collections.singletonList(
+                    new PartitionInfo(topicName, 0, null, null, null)));
+                consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
+                consumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
+            }
+
+            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(
+                new LogContext("mock "),
+                globalTopology,
+                consumer,
+                stateDirectory,
+                stateRestoreListener,
+                streamsConfig);
+
+            final GlobalProcessorContextImpl globalProcessorContext
+                = new GlobalProcessorContextImpl(streamsConfig, stateManager, streamsMetrics, cache);
+            stateManager.setGlobalProcessorContext(globalProcessorContext);
+
+            globalStateTask = new GlobalStateUpdateTask(
+                globalTopology,
+                globalProcessorContext,
+                stateManager,
+                new LogAndContinueExceptionHandler(),
+                new LogContext());
+            globalStateTask.initialize();
+        }
+
+        if (!partitionsByTopic.isEmpty()) {
+            task = new StreamTask(
+                TASK_ID,
+                partitionsByTopic.values(),
+                processorTopology,
+                consumer,
+                new StoreChangelogReader(
+                    createRestoreConsumer(processorTopology.storeToChangelogTopic()),
+                    stateRestoreListener,
+                    new LogContext("topology-test-driver ")),
+                streamsConfig,
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                mockTime,
+                producer);
+            task.initializeStateStores();
+            task.initializeTopology();
+        }
+    }
+
+    /**
+     * Send an input message with the given key, value, and timestamp on the specified topic to the topology and then
+     * commit the messages.
+     *
+     * @param consumerRecord the record to be processed
+     */
+    public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final String topicName = consumerRecord.topic();
+
+        final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+        if (topicPartition != null) {
+            final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
+            task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
+                topicName,
+                topicPartition.partition(),
+                offset,
+                consumerRecord.timestamp(),
+                consumerRecord.timestampType(),
+                consumerRecord.checksum(),
+                consumerRecord.serializedKeySize(),
+                consumerRecord.serializedValueSize(),
+                consumerRecord.key(),
+                consumerRecord.value())));
+            producer.clear();
+
+            // Process the record ...
+            ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName));
+            task.process();
+            task.maybePunctuateStreamTime();
+            task.commit();
+
+            // Capture all the records sent to the producer ...
+            for (final ProducerRecord<byte[], byte[]> record : producer.history()) {
+                Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
+                if (outputRecords == null) {
+                    outputRecords = new LinkedList<>();
+                    outputRecordsByTopic.put(record.topic(), outputRecords);
+                }
+                outputRecords.add(record);
+
+                // Forward back into the topology if the produced record is to an internal or a source topic ...
+                final String outputTopicName = record.topic();
+                if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) {
+                    final byte[] serializedKey = record.key();
+                    final byte[] serializedValue = record.value();
+
+                    pipeInput(new ConsumerRecord<>(
+                        outputTopicName,
+                        -1,
+                        -1L,
+                        record.timestamp(),
+                        TimestampType.CREATE_TIME,
+                        0L,
+                        serializedKey == null ? 0 : serializedKey.length,
+                        serializedValue == null ? 0 : serializedValue.length,
+                        serializedKey,
+                        serializedValue));
+                }
+            }
+        } else {
+            final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
+            if (globalTopicPartition == null) {
+                throw new IllegalArgumentException("Unknown topic: " + topicName);
+            }
+            final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1;
+            globalStateTask.update(new ConsumerRecord<>(
+                globalTopicPartition.topic(),
+                globalTopicPartition.partition(),
+                offset,
+                consumerRecord.timestamp(),
+                consumerRecord.timestampType(),
+                consumerRecord.checksum(),
+                consumerRecord.serializedKeySize(),
+                consumerRecord.serializedValueSize(),
+                consumerRecord.key(),
+                consumerRecord.value()));
+            globalStateTask.flushState();
+        }
+    }
+
+    /**
+     * Send input messages to the topology and then commit each message individually.
+     *
+     * @param records a list of records to be processed
+     */
+    public void pipeInput(final List<ConsumerRecord<byte[], byte[]>> records) {
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            pipeInput(record);
+        }
+    }
+
+    /**
+     * Advances the internally mocked wall-clock time.
+     * This might trigger a {@link PunctuationType#WALL_CLOCK_TIME wall-clock} type
+     * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) punctuations}.
+     *
+     * @param advanceMs the amount of time to advance wall-clock time in milliseconds
+     */
+    public void advanceWallClockTime(final long advanceMs) {
+        mockTime.sleep(advanceMs);
+        task.maybePunctuateSystemTime();
+        task.commit();
+    }
+
+    /**
+     * Read the next record from the given topic.
+     * These records were output by the topology during the previous calls to {@link #pipeInput(ConsumerRecord)}.
+     *
+     * @param topic the name of the topic
+     * @return the next record on that topic, or {@code null} if there is no record available
+     */
+    public ProducerRecord<byte[], byte[]> readOutput(final String topic) {
+        final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
+        if (outputRecords == null) {
+            return null;
+        }
+        return outputRecords.poll();
+    }
+
+    /**
+     * Read the next record from the given topic.
+     * These records were output by the topology during the previous calls to {@link #pipeInput(ConsumerRecord)}.
+     *
+     * @param topic the name of the topic
+     * @param keyDeserializer the deserializer for the key type
+     * @param valueDeserializer the deserializer for the value type
+     * @return the next record on that topic, or {@code null} if there is no record available
+     */
+    public <K, V> ProducerRecord<K, V> readOutput(final String topic,
+                                                  final Deserializer<K> keyDeserializer,
+                                                  final Deserializer<V> valueDeserializer) {
+        final ProducerRecord<byte[], byte[]> record = readOutput(topic);
+        if (record == null) {
+            return null;
+        }
+        final K key = keyDeserializer.deserialize(record.topic(), record.key());
+        final V value = valueDeserializer.deserialize(record.topic(), record.value());
+        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
+    }
+
+    /**
+     * Get all {@link StateStore StateStores} from the topology.
+     * The stores can be a "regular" or global stores.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @return all stores my name
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    public Map<String, StateStore> getAllStateStores() {
+        final Map<String, StateStore> allStores = new HashMap<>();
+        for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
+            allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName));
+        }
+        return allStores;
+    }
+
+    /**
+     * Get the {@link StateStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the state store, or {@code null} if no store has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getKeyValueStore(String)
+     * @see #getWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    public StateStore getStateStore(final String name) {
+        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+    }
+
+    /**
+     * Get the {@link KeyValueStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link KeyValueStore} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
+        final StateStore store = getStateStore(name);
+        return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
+    }
+
+    /**
+     * Get the {@link WindowStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link WindowStore} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getSessionStore(String) (String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> WindowStore<K, V> getWindowStore(final String name) {
+        final StateStore store = getStateStore(name);
+        return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
+    }
+
+    /**
+     * Get the {@link SessionStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+     * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getWindowStore(String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> SessionStore<K, V> getSessionStore(final String name) {
+        final StateStore store = getStateStore(name);
+        return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
+    }
+
+    /**
+     * Close the driver, its topology, and all processors.
+     */
+    public void close() {
+        if (task != null) {
+            task.close(true, false);
+        }
+        if (globalStateTask != null) {
+            try {
+                globalStateTask.close();
+            } catch (final IOException e) {
+                // ignore
+            }
+        }
+    }
+
+    static class MockTime implements Time {
+        private final AtomicLong timeMs;
+        private final AtomicLong highResTimeNs;
+
+        MockTime(final long startTimestampMs) {
+            this.timeMs = new AtomicLong(startTimestampMs);
+            this.highResTimeNs = new AtomicLong(startTimestampMs * 1000L * 1000L);
+        }
+
+        @Override
+        public long milliseconds() {
+            return timeMs.get();
+        }
+
+        @Override
+        public long nanoseconds() {
+            return highResTimeNs.get();
+        }
+
+        @Override
+        public long hiResClockMs() {
+            return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
+        }
+
+        @Override
+        public void sleep(final long ms) {
+            if (ms < 0) {
+                throw new IllegalArgumentException("Sleep ms cannot be negative.");
+            }
+            timeMs.addAndGet(ms);
+            highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
+        }
+    }
+
+    private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
+            @Override
+            public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
+
+            @Override
+            public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
+
+            @Override
+            public synchronized long position(final TopicPartition partition) {
+                return 0L;
+            }
+        };
+
+        // for each store
+        for (final Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) {
+            final String topicName = storeAndTopic.getValue();
+            // Set up the restore-state topic ...
+            // consumer.subscribe(new TopicPartition(topicName, 0));
+            // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
+            final List<PartitionInfo> partitionInfos = new ArrayList<>();
+            partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
+            consumer.updatePartitions(topicName, partitionInfos);
+            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
+        }
+        return consumer;
+    }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
new file mode 100644
index 0000000..ea8d632
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.TopologyTestDriver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Factory to create {@link ConsumerRecord consumer records} for a single single-partitioned topic with given key and
+ * value {@link Serializer serializers}.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ *
+ * @see TopologyTestDriver
+ */
+@InterfaceStability.Evolving
+public class ConsumerRecordFactory<K, V> {
+    private final String topicName;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
+    private long timeMs;
+    private long advanceMs;
+
+    /**
+     * Create a new factory for the given topic.
+     * Uses current system time as start timestamp.
+     * Auto-advance is disabled.
+     *
+     * @param keySerializer the key serializer
+     * @param valueSerializer the value serializer
+     */
+    public ConsumerRecordFactory(final Serializer<K> keySerializer,
+                                 final Serializer<V> valueSerializer) {
+        this(null, keySerializer, valueSerializer, System.currentTimeMillis());
+    }
+
+    /**
+     * Create a new factory for the given topic.
+     * Uses current system time as start timestamp.
+     * Auto-advance is disabled.
+     *
+     * @param defaultTopicName the default topic name used for all generated {@link ConsumerRecord consumer records}
+     * @param keySerializer the key serializer
+     * @param valueSerializer the value serializer
+     */
+    public ConsumerRecordFactory(final String defaultTopicName,
+                                 final Serializer<K> keySerializer,
+                                 final Serializer<V> valueSerializer) {
+        this(defaultTopicName, keySerializer, valueSerializer, System.currentTimeMillis());
+    }
+
+    /**
+     * Create a new factory for the given topic.
+     * Auto-advance is disabled.
+     *
+     * @param keySerializer the key serializer
+     * @param valueSerializer the value serializer
+     * @param startTimestampMs the initial timestamp for generated records
+     */
+    public ConsumerRecordFactory(final Serializer<K> keySerializer,
+                                 final Serializer<V> valueSerializer,
+                                 final long startTimestampMs) {
+        this(null, keySerializer, valueSerializer, startTimestampMs, 0L);
+    }
+
+    /**
+     * Create a new factory for the given topic.
+     * Auto-advance is disabled.
+     *
+     * @param defaultTopicName the topic name used for all generated {@link ConsumerRecord consumer records}
+     * @param keySerializer the key serializer
+     * @param valueSerializer the value serializer
+     * @param startTimestampMs the initial timestamp for generated records
+     */
+    public ConsumerRecordFactory(final String defaultTopicName,
+                                 final Serializer<K> keySerializer,
+                                 final Serializer<V> valueSerializer,
+                                 final long startTimestampMs) {
+        this(defaultTopicName, keySerializer, valueSerializer, startTimestampMs, 0L);
+    }
+
+    /**
+     * Create a new factory for the given topic.
+     *
+     * @param keySerializer the key serializer
+     * @param valueSerializer the value serializer
+     * @param startTimestampMs the initial timestamp for generated records
+     * @param autoAdvanceMs the time increment pre generated record
+     */
+    public ConsumerRecordFactory(final Serializer<K> keySerializer,
+                                 final Serializer<V> valueSerializer,
+                                 final long startTimestampMs,
+                                 final long autoAdvanceMs) {
+        this(null, keySerializer, valueSerializer, startTimestampMs, autoAdvanceMs);
+    }
+
+    /**
+     * Create a new factory for the given topic.
+     *
+     * @param defaultTopicName the topic name used for all generated {@link ConsumerRecord consumer records}
+     * @param keySerializer the key serializer
+     * @param valueSerializer the value serializer
+     * @param startTimestampMs the initial timestamp for generated records
+     * @param autoAdvanceMs the time increment pre generated record
+     */
+    public ConsumerRecordFactory(final String defaultTopicName,
+                                 final Serializer<K> keySerializer,
+                                 final Serializer<V> valueSerializer,
+                                 final long startTimestampMs,
+                                 final long autoAdvanceMs) {
+        Objects.requireNonNull(keySerializer, "keySerializer cannot be null");
+        Objects.requireNonNull(valueSerializer, "valueSerializer cannot be null");
+        this.topicName = defaultTopicName;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        timeMs = startTimestampMs;
+        advanceMs = autoAdvanceMs;
+    }
+
+    /**
+     * Advances the internally tracked time.
+     *
+     * @param advanceMs the amount of time to advance
+     */
+    public void advanceTimeMs(final long advanceMs) {
+        if (advanceMs < 0) {
+            throw new IllegalArgumentException("advanceMs must be positive");
+        }
+        timeMs += advanceMs;
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with the given topic name, key, value, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param key the record key
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final K key,
+                                                 final V value,
+                                                 final long timestampMs) {
+        Objects.requireNonNull(topicName, "topicName cannot be null.");
+        final byte[] serializedKey = keySerializer.serialize(topicName, key);
+        final byte[] serializedValue = valueSerializer.serialize(topicName, value);
+        return new ConsumerRecord<>(
+            topicName,
+            -1,
+            -1L,
+            timestampMs,
+            TimestampType.CREATE_TIME,
+            0L,
+            serializedKey == null ? 0 : serializedKey.length,
+            serializedValue == null ? 0 : serializedValue.length,
+            serializedKey,
+            serializedValue);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and given key, value, and timestamp.
+     * Does not auto advance internally tracked time.
+     *
+     * @param key the record key
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final K key,
+                                                 final V value,
+                                                 final long timestampMs) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, K key, V value, long timestampMs) instead.");
+        }
+        return create(topicName, key, value, timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with the given topic name, key, and value.
+     * The timestamp will be generated from the constructor provided and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param key the record key
+     * @param value the record value
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final K key,
+                                                 final V value) {
+        final long timestamp = timeMs;
+        timeMs += advanceMs;
+        return create(topicName, key, value, timestamp);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and given key and value.
+     * The timestamp will be generated from the constructor provided and time will auto advance.
+     *
+     * @param key the record key
+     * @param value the record value
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final K key,
+                                                 final V value) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, K key, V value) instead.");
+        }
+        return create(topicName, key, value);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and timestamp.
+     *
+     * @param topicName the topic name
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final V value,
+                                                 final long timestampMs) {
+        return create(topicName, null, value, timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value and timestamp.
+     *
+     * @param value the record value
+     * @param timestampMs the record timestamp
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final V value,
+                                                 final long timestampMs) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, V value, long timestampMs) instead.");
+        }
+        return create(topicName, value, timestampMs);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name and value.
+     * The timestamp will be generated from the constructor provided and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param value the record value
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final String topicName,
+                                                 final V value) {
+        return create(topicName, null, value);
+    }
+
+    /**
+     * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value.
+     * The timestamp will be generated from the constructor provided and time will auto advance.
+     *
+     * @param value the record value
+     * @return the generated {@link ConsumerRecord}
+     */
+    public ConsumerRecord<byte[], byte[]> create(final V value) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, V value, long timestampMs) instead.");
+        }
+        return create(topicName, value);
+    }
+
+    /**
+     * Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
+     * The timestamp will be generated from the constructor provided and time will auto advance.
+     *
+     * @param topicName the topic name
+     * @param keyValues the record keys and values
+     * @return the generated {@link ConsumerRecord consumer records}
+     */
+    public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
+                                                       final List<KeyValue<K, V>> keyValues) {
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
+
+        for (final KeyValue<K, V> keyValue : keyValues) {
+            records.add(create(topicName, keyValue.key, keyValue.value));
+        }
+
+        return records;
+    }
+
+    /**
+     * Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
+     * The timestamp will be generated from the constructor provided and time will auto advance.
+     *
+     * @param keyValues the record keys and values
+     * @return the generated {@link ConsumerRecord consumer records}
+     */
+    public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, List<KeyValue<K, V>> keyValues) instead.");
+        }
+
+        return create(topicName, keyValues);
+    }
+
+    /**
+     * Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param keyValues the record keys and values
+     * @param startTimestamp the timestamp for the first generated record
+     * @param advanceMs the time difference between two consecutive generated records
+     * @return the generated {@link ConsumerRecord consumer records}
+     */
+    public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
+                                                       final List<KeyValue<K, V>> keyValues,
+                                                       final long startTimestamp,
+                                                       final long advanceMs) {
+        if (advanceMs < 0) {
+            throw new IllegalArgumentException("advanceMs must be positive");
+        }
+
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
+
+        long timestamp = startTimestamp;
+        for (final KeyValue<K, V> keyValue : keyValues) {
+            records.add(create(topicName, keyValue.key, keyValue.value, timestamp));
+            timestamp += advanceMs;
+        }
+
+        return records;
+    }
+
+    /**
+     * Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
+     * Does not auto advance internally tracked time.
+     *
+     * @param keyValues the record keys and values
+     * @param startTimestamp the timestamp for the first generated record
+     * @param advanceMs the time difference between two consecutive generated records
+     * @return the generated {@link ConsumerRecord consumer records}
+     */
+    public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
+                                                       final long startTimestamp,
+                                                       final long advanceMs) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs) instead.");
+        }
+
+        return create(topicName, keyValues, startTimestamp, advanceMs);
+    }
+
+    /**
+     * Creates {@link ConsumerRecord consumer records} with the given topic name, keys and values.
+     * For each generated record, the time is advanced by 1.
+     * Does not auto advance internally tracked time.
+     *
+     * @param topicName the topic name
+     * @param keyValues the record keys and values
+     * @param startTimestamp the timestamp for the first generated record
+     * @return the generated {@link ConsumerRecord consumer records}
+     */
+    public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
+                                                       final List<KeyValue<K, V>> keyValues,
+                                                       final long startTimestamp) {
+        return create(topicName, keyValues, startTimestamp, 1);
+    }
+
+    /**
+     * Creates {@link ConsumerRecord consumer records} with the given keys and values.
+     * For each generated record, the time is advanced by 1.
+     * Does not auto advance internally tracked time.
+     *
+     * @param keyValues the record keys and values
+     * @param startTimestamp the timestamp for the first generated record
+     * @return the generated {@link ConsumerRecord consumer records}
+     */
+    public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
+                                                       final long startTimestamp) {
+        if (topicName == null) {
+            throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+                "Use #create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp) instead.");
+        }
+
+        return create(topicName, keyValues, startTimestamp, 1);
+    }
+
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
new file mode 100644
index 0000000..09ed294
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.streams.TopologyTestDriver;
+
+import java.util.Objects;
+
+/**
+ * Helper class to verify topology result records.
+ *
+ * @see TopologyTestDriver
+ */
+public class OutputVerifier {
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided value and throws an {@link AssertionError} if the
+     * {@code ProducerRecord}'s value is not equal to the expected value.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedValue}
+     */
+    public static <K, V> void compareValue(final ProducerRecord<K, V> record,
+                                           final V expectedValue) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final V recordValue = record.value();
+        final AssertionError error = new AssertionError("Expected value=" + expectedValue + " but was value=" + recordValue);
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the values of two {@link ProducerRecord}'s and throws an {@link AssertionError} if they are not equal to
+     * each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedRecord}'s value
+     */
+    public static <K, V> void compareValue(final ProducerRecord<K, V> record,
+                                           final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareValue(record, expectedRecord.value());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided key and value and throws an {@link AssertionError} if the
+     * {@code ProducerRecord}'s key or value is not equal to the expected key or value.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedKey the expected key of the {@code ProducerRecord}
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedKey} or {@code expectedValue}
+     */
+    public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
+                                              final K expectedKey,
+                                              final V expectedValue) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> " +
+            "but was <" + recordKey + ", " + recordValue + ">");
+
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the keys and values of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the keys or
+     * values are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedRecord}'s key or value
+     */
+    public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
+                                              final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareKeyValue(record, expectedRecord.key(), expectedRecord.value());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided value and timestamp and throws an {@link AssertionError} if
+     * the {@code ProducerRecord}'s value or timestamp is not equal to the expected value or timestamp.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedTimestamp the expected timestamps of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedValue} or {@code expectedTimestamp}
+     */
+    public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
+                                                    final V expectedValue,
+                                                    final long expectedTimestamp) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final V recordValue = record.value();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with timestamp=" + expectedTimestamp +
+            " but was value=" + recordValue + " with timestamp=" + recordTimestamp);
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the values and timestamps of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the
+     * values or timestamps are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedRecord}'s value or timestamp
+     */
+    public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
+                                                    final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareValueTimestamp(record, expectedRecord.value(), expectedRecord.timestamp());
+    }
+
+    /**
+     * Compares a {@link ProducerRecord} with the provided key, value, and timestamp and throws an
+     * {@link AssertionError} if the {@code ProducerRecord}'s key, value, or timestamp is not equal to the expected key,
+     * value, or timestamp.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedKey the expected key of the {@code ProducerRecord}
+     * @param expectedValue the expected value of the {@code ProducerRecord}
+     * @param expectedTimestamp the expected timestamp of the {@code ProducerRecord}
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, timestamp is not equal to {@code expectedKey},
+     * {@code expectedValue}, or {@code expectedTimestamps}
+     */
+    public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
+                                                       final K expectedKey,
+                                                       final V expectedValue,
+                                                       final long expectedTimestamp) throws AssertionError {
+        Objects.requireNonNull(record);
+
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
+            " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
+
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    /**
+     * Compares the keys, values, and timestamps of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
+     * the keys, values, or timestamps are not equal to each other.
+     *
+     * @param record a output {@code ProducerRecord} for verification
+     * @param expectedRecord a {@code ProducerRecord} for verification
+     * @param <K> the key type
+     * @param <V> the value type
+     * @throws AssertionError if {@code ProducerRecord}'s key, value, or timestamp is not equal to
+     * {@code expectedRecord}'s key, value, or timestamp
+     */
+    public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
+                                                       final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+        Objects.requireNonNull(expectedRecord);
+        compareKeyValueTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.timestamp());
+    }
+
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockTimeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockTimeTest.java
new file mode 100644
index 0000000..bc11b70
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockTimeTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockTimeTest {
+
+    @Test
+    public void shouldSetStartTime() {
+        final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
+        assertEquals(42L, time.milliseconds());
+        assertEquals(42L * 1000L * 1000L, time.nanoseconds());
+    }
+
+    @Test
+    public void shouldGetNanosAsMillis() {
+        final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
+        assertEquals(42L, time.hiResClockMs());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldNotAllowNegativeSleep() {
+        new TopologyTestDriver.MockTime(42).sleep(-1L);
+    }
+
+    @Test
+    public void shouldAdvanceTimeOnSleep() {
+        final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
+
+        assertEquals(42L, time.milliseconds());
+        time.sleep(1L);
+        assertEquals(43L, time.milliseconds());
+        time.sleep(0L);
+        assertEquals(43L, time.milliseconds());
+        time.sleep(3L);
+        assertEquals(46L, time.milliseconds());
+    }
+
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
new file mode 100644
index 0000000..4100f18
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TopologyTestDriverTest {
+    private final static String SOURCE_TOPIC_1 = "source-topic-1";
+    private final static String SOURCE_TOPIC_2 = "source-topic-2";
+    private final static String SINK_TOPIC_1 = "sink-topic-1";
+    private final static String SINK_TOPIC_2 = "sink-topic-2";
+
+    private final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory<>(
+        new ByteArraySerializer(),
+        new ByteArraySerializer());
+
+    private final byte[] key1 = new byte[0];
+    private final byte[] value1 = new byte[0];
+    private final long timestamp1 = 42L;
+    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, timestamp1);
+
+    private final byte[] key2 = new byte[0];
+    private final byte[] value2 = new byte[0];
+    private final long timestamp2 = 43L;
+    private final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
+
+    private TopologyTestDriver testDriver;
+    private final Properties config = new Properties() {
+        {
+            put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver");
+            put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+            put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        }
+    };
+
+    private final static class Record {
+        private Object key;
+        private Object value;
+        private long timestamp;
+        private long offset;
+        private String topic;
+
+        Record(final ConsumerRecord consumerRecord) {
+            key = consumerRecord.key();
+            value = consumerRecord.value();
+            timestamp = consumerRecord.timestamp();
+            offset = consumerRecord.offset();
+            topic = consumerRecord.topic();
+        }
+
+        Record(final Object key,
+               final Object value,
+               final long timestamp,
+               final long offset,
+               final String topic) {
+            this.key = key;
+            this.value = value;
+            this.timestamp = timestamp;
+            this.offset = offset;
+            this.topic = topic;
+        }
+
+        @Override
+        public String toString() {
+            return "key: " + key + ", value: " + value + ", timestamp: " + timestamp + ", offset: " + offset + ", topic: " + topic;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final Record record = (Record) o;
+            return timestamp == record.timestamp &&
+                offset == record.offset &&
+                Objects.equals(key, record.key) &&
+                Objects.equals(value, record.value) &&
+                Objects.equals(topic, record.topic);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value, timestamp, offset, topic);
+        }
+    }
+
+    private final static class Punctuation {
+        private final long intervalMs;
+        private final PunctuationType punctuationType;
+        private final Punctuator callback;
+
+        Punctuation(final long intervalMs,
+                    final PunctuationType punctuationType,
+                    final Punctuator callback) {
+            this.intervalMs = intervalMs;
+            this.punctuationType = punctuationType;
+            this.callback = callback;
+        }
+    }
+
+    private final class MockPunctuator implements Punctuator {
+        private final List<Long> punctuatedAt = new LinkedList<>();
+
+        @Override
+        public void punctuate(long timestamp) {
+            punctuatedAt.add(timestamp);
+        }
+    }
+
+    private final class MockProcessor implements Processor {
+        private final Collection<Punctuation> punctuations;
+        private ProcessorContext context;
+
+        private boolean initialized = false;
+        private boolean closed = false;
+        private final List<Record> processedRecords = new ArrayList<>();
+
+        MockProcessor() {
+            this(Collections.<Punctuation>emptySet());
+        }
+
+        MockProcessor(final Collection<Punctuation> punctuations) {
+            this.punctuations = punctuations;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            initialized = true;
+            this.context = context;
+            for (final Punctuation punctuation : punctuations) {
+                this.context.schedule(punctuation.intervalMs, punctuation.punctuationType, punctuation.callback);
+            }
+        }
+
+        @Override
+        public void process(Object key, Object value) {
+            processedRecords.add(new Record(key, value, context.timestamp(), context.offset(), context.topic()));
+            context.forward(key, value);
+        }
+
+        @Override
+        public void punctuate(long timestamp) {} // deprecated
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+    }
+
+    private final List<MockProcessor> mockProcessors = new ArrayList<>();
+
+    private final class MockProcessorSupplier implements ProcessorSupplier {
+        private final Collection<Punctuation> punctuations;
+
+        private MockProcessorSupplier() {
+            this(Collections.<Punctuation>emptySet());
+        }
+
+        private MockProcessorSupplier(final Collection<Punctuation> punctuations) {
+            this.punctuations = punctuations;
+        }
+
+        @Override
+        public Processor get() {
+            final MockProcessor mockProcessor = new MockProcessor(punctuations);
+            mockProcessors.add(mockProcessor);
+            return mockProcessor;
+        }
+    }
+
+    @After
+    public void tearDown() {
+        testDriver.close();
+    }
+
+    private Topology setupSourceSinkTopology() {
+        final Topology topology = new Topology();
+
+        final String sourceName = "source";
+
+        topology.addSource(sourceName, SOURCE_TOPIC_1);
+        topology.addSink("sink", SINK_TOPIC_1, sourceName);
+
+        return topology;
+    }
+
+    private Topology setupTopologyWithTwoSubtopologies() {
+        final Topology topology = new Topology();
+
+        final String sourceName1 = "source-1";
+        final String sourceName2 = "source-2";
+
+        topology.addSource(sourceName1, SOURCE_TOPIC_1);
+        topology.addSink("sink-1", SINK_TOPIC_1, sourceName1);
+        topology.addSource(sourceName2, SINK_TOPIC_1);
+        topology.addSink("sink-2", SINK_TOPIC_2, sourceName2);
+
+        return topology;
+    }
+
+
+    private Topology setupSingleProcessorTopology() {
+        return setupSingleProcessorTopology(-1, null, null);
+    }
+
+    private Topology setupSingleProcessorTopology(final long punctuationIntervalMs,
+                                                  final PunctuationType punctuationType,
+                                                  final Punctuator callback) {
+        final Collection<Punctuation> punctuations;
+        if (punctuationIntervalMs > 0 && punctuationType != null && callback != null) {
+            punctuations = Collections.singleton(new Punctuation(punctuationIntervalMs, punctuationType, callback));
+        } else {
+            punctuations = Collections.emptySet();
+        }
+
+        final Topology topology = new Topology();
+
+        final String sourceName = "source";
+
+        topology.addSource(sourceName, SOURCE_TOPIC_1);
+        topology.addProcessor("processor", new MockProcessorSupplier(punctuations), sourceName);
+
+        return topology;
+    }
+
+    private Topology setupMultipleSourceTopology(final String... sourceTopicNames) {
+        final Topology topology = new Topology();
+
+        final String[] processorNames = new String[sourceTopicNames.length];
+        int i = 0;
+        for (final String sourceTopicName : sourceTopicNames) {
+            final String sourceName = sourceTopicName + "-source";
+            final String processorName = sourceTopicName + "-processor";
+            topology.addSource(sourceName, sourceTopicName);
+            processorNames[i++] = processorName;
+            topology.addProcessor(processorName, new MockProcessorSupplier(), sourceName);
+        }
+        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
+
+        return topology;
+    }
+
+    private Topology setupGlobalStoreTopology(final String... sourceTopicNames) {
+        if (sourceTopicNames.length == 0) {
+            throw new IllegalArgumentException("sourceTopicNames cannot be empty");
+        }
+        final Topology topology = new Topology();
+
+        for (final String sourceTopicName : sourceTopicNames) {
+            topology.addGlobalStore(
+                Stores.<Bytes, byte[]>keyValueStoreBuilder(Stores.inMemoryKeyValueStore(sourceTopicName + "-globalStore"), null, null).withLoggingDisabled(),
+                sourceTopicName,
+                null,
+                null,
+                sourceTopicName,
+                sourceTopicName + "-processor",
+                new MockProcessorSupplier()
+            );
+        }
+
+        return topology;
+    }
+
+    @Test
+    public void shouldInitProcessor() {
+        testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+        assertTrue(mockProcessors.get(0).initialized);
+    }
+
+    @Test
+    public void shouldCloseProcessor() {
+        testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+
+        testDriver.close();
+        assertTrue(mockProcessors.get(0).closed);
+    }
+
+    @Test
+    public void shouldThrowForUnknownTopic() {
+        final String unknownTopic = "unknownTopic";
+        final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory<>(
+            "unknownTopic",
+            new ByteArraySerializer(),
+            new ByteArraySerializer());
+
+        testDriver = new TopologyTestDriver(new Topology(), config);
+        try {
+            testDriver.pipeInput(consumerRecordFactory.create((byte[]) null));
+            fail("Should have throw IllegalArgumentException");
+        } catch (final IllegalArgumentException exception) {
+            assertEquals("Unknown topic: " + unknownTopic, exception.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldProcessRecordForTopic() {
+        testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+
+        testDriver.pipeInput(consumerRecord1);
+        final ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1);
+
+        assertEquals(key1, outputRecord.key());
+        assertEquals(value1, outputRecord.value());
+        assertEquals(SINK_TOPIC_1, outputRecord.topic());
+    }
+
+    @Test
+    public void shouldSetRecordMetadata() {
+        testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+
+        testDriver.pipeInput(consumerRecord1);
+
+        final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+        assertEquals(1, processedRecords.size());
+
+        final Record record = processedRecords.get(0);
+        final Record expectedResult = new Record(consumerRecord1);
+        expectedResult.offset = 0L;
+
+        assertThat(record, equalTo(expectedResult));
+    }
+
+    @Test
+    public void shouldSendRecordViaCorrectSourceTopic() {
+        testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
+
+        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+
+        testDriver.pipeInput(consumerRecord1);
+
+        assertEquals(1, processedRecords1.size());
+        assertEquals(0, processedRecords2.size());
+
+        Record record = processedRecords1.get(0);
+        Record expectedResult = new Record(consumerRecord1);
+        expectedResult.offset = 0L;
+        assertThat(record, equalTo(expectedResult));
+
+        testDriver.pipeInput(consumerRecord2);
+
+        assertEquals(1, processedRecords1.size());
+        assertEquals(1, processedRecords2.size());
+
+        record = processedRecords2.get(0);
+        expectedResult = new Record(consumerRecord2);
+        expectedResult.offset = 0L;
+        assertThat(record, equalTo(expectedResult));
+    }
+
+    @Test
+    public void shouldUseSourceSpecificDeserializers() {
+        final Topology topology = new Topology();
+
+        final String sourceName1 = "source-1";
+        final String sourceName2 = "source-2";
+        final String processor = "processor";
+
+        topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1);
+        topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2);
+        topology.addProcessor(processor, new MockProcessorSupplier(), sourceName1, sourceName2);
+        topology.addSink(
+            "sink",
+            SINK_TOPIC_1,
+            new Serializer() {
+                @Override
+                public byte[] serialize(String topic, Object data) {
+                    if (data instanceof Long) {
+                        return Serdes.Long().serializer().serialize(topic, (Long) data);
+                    }
+                    return Serdes.Integer().serializer().serialize(topic, (Integer) data);
+                }
+                @Override
+                public void close() {}
+                @Override
+                public void configure(Map configs, boolean isKey) {}
+            },
+            new Serializer() {
+                @Override
+                public byte[] serialize(String topic, Object data) {
+                    if (data instanceof String) {
+                        return Serdes.String().serializer().serialize(topic, (String) data);
+                    }
+                    return Serdes.Double().serializer().serialize(topic, (Double) data);
+                }
+                @Override
+                public void close() {}
+                @Override
+                public void configure(Map configs, boolean isKey) {}
+            },
+            processor);
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        final ConsumerRecordFactory<Long, String> source1Factory = new ConsumerRecordFactory<>(
+            SOURCE_TOPIC_1,
+            Serdes.Long().serializer(),
+            Serdes.String().serializer());
+        final ConsumerRecordFactory<Integer, Double> source2Factory = new ConsumerRecordFactory<>(
+            SOURCE_TOPIC_2,
+            Serdes.Integer().serializer(),
+            Serdes.Double().serializer());
+
+        final Long source1Key = 42L;
+        final String source1Value = "anyString";
+        final Integer source2Key = 73;
+        final Double source2Value = 3.14;
+
+        final ConsumerRecord<byte[], byte[]> consumerRecord1 = source1Factory.create(source1Key, source1Value);
+        final ConsumerRecord<byte[], byte[]> consumerRecord2 = source2Factory.create(source2Key, source2Value);
+
+        testDriver.pipeInput(consumerRecord1);
+        OutputVerifier.compareKeyValue(
+            testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()),
+            source1Key,
+            source1Value);
+
+        testDriver.pipeInput(consumerRecord2);
+        OutputVerifier.compareKeyValue(
+            testDriver.readOutput(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer()),
+            source2Key,
+            source2Value);
+    }
+
+    @Test
+    public void shouldUseSinkeSpecificSerializers() {
+        final Topology topology = new Topology();
+
+        final String sourceName1 = "source-1";
+        final String sourceName2 = "source-2";
+
+        topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1);
+        topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2);
+        topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), sourceName1);
+        topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), sourceName2);
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        final ConsumerRecordFactory<Long, String> source1Factory = new ConsumerRecordFactory<>(
+            SOURCE_TOPIC_1,
+            Serdes.Long().serializer(),
+            Serdes.String().serializer());
+        final ConsumerRecordFactory<Integer, Double> source2Factory = new ConsumerRecordFactory<>(
+            SOURCE_TOPIC_2,
+            Serdes.Integer().serializer(),
+            Serdes.Double().serializer());
+
+        final Long source1Key = 42L;
+        final String source1Value = "anyString";
+        final Integer source2Key = 73;
+        final Double source2Value = 3.14;
+
+        final ConsumerRecord<byte[], byte[]> consumerRecord1 = source1Factory.create(source1Key, source1Value);
+        final ConsumerRecord<byte[], byte[]> consumerRecord2 = source2Factory.create(source2Key, source2Value);
+
+        testDriver.pipeInput(consumerRecord1);
+        OutputVerifier.compareKeyValue(
+            testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()),
+            source1Key,
+            source1Value);
+
+        testDriver.pipeInput(consumerRecord2);
+        OutputVerifier.compareKeyValue(
+            testDriver.readOutput(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer()),
+            source2Key,
+            source2Value);
+    }
+
+    @Test
+    public void shouldProcessConsumerRecordList() {
+        testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
+
+        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+
+        final List<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<>(2);
+        testRecords.add(consumerRecord1);
+        testRecords.add(consumerRecord2);
+
+        testDriver.pipeInput(testRecords);
+
+        assertEquals(1, processedRecords1.size());
+        assertEquals(1, processedRecords2.size());
+
+        Record record = processedRecords1.get(0);
+        Record expectedResult = new Record(consumerRecord1);
+        expectedResult.offset = 0L;
+        assertThat(record, equalTo(expectedResult));
+
+        record = processedRecords2.get(0);
+        expectedResult = new Record(consumerRecord2);
+        expectedResult.offset = 0L;
+        assertThat(record, equalTo(expectedResult));
+    }
+
+    @Test
+    public void shouldForwardRecordsFromSubtopologyToSubtopology() {
+        testDriver = new TopologyTestDriver(setupTopologyWithTwoSubtopologies(), config);
+
+        testDriver.pipeInput(consumerRecord1);
+
+        ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1);
+        assertEquals(key1, outputRecord.key());
+        assertEquals(value1, outputRecord.value());
+        assertEquals(SINK_TOPIC_1, outputRecord.topic());
+
+        outputRecord = testDriver.readOutput(SINK_TOPIC_2);
+        assertEquals(key1, outputRecord.key());
+        assertEquals(value1, outputRecord.value());
+        assertEquals(SINK_TOPIC_2, outputRecord.topic());
+    }
+
+    @Test
+    public void shouldPopulateGlobalStore() {
+        testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config);
+
+        testDriver.pipeInput(consumerRecord1);
+
+        final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+        assertEquals(1, processedRecords.size());
+
+        final Record record = processedRecords.get(0);
+        final Record expectedResult = new Record(consumerRecord1);
+        expectedResult.offset = 0L;
+        assertThat(record, equalTo(expectedResult));
+    }
+
+    @Test
+    public void shouldPunctuateOnStreamsTime() {
+        final MockPunctuator mockPunctuator = new MockPunctuator();
+        testDriver = new TopologyTestDriver(
+            setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator),
+            config);
+
+        final List<Long> expectedPunctuations = new LinkedList<>();
+
+        expectedPunctuations.add(42L);
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 51L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(52L);
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 52L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 61L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(65L);
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 65L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 71L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(72L);
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 72L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(95L);
+        expectedPunctuations.add(95L);
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 95L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 101L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(102L);
+        testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 102L));
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+    }
+
+    @Test
+    public void shouldPunctuateOnWallClockTime() {
+        final MockPunctuator mockPunctuator = new MockPunctuator();
+        testDriver = new TopologyTestDriver(
+            setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator),
+            config,
+            0);
+
+        final List<Long> expectedPunctuations = new LinkedList<>();
+
+        expectedPunctuations.add(5L);
+        testDriver.advanceWallClockTime(5L);
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        testDriver.advanceWallClockTime(9L);
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(15L);
+        testDriver.advanceWallClockTime(1L);
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+        expectedPunctuations.add(35L);
+        expectedPunctuations.add(35L);
+        testDriver.advanceWallClockTime(20L);
+        assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+    }
+
+    @Test
+    public void shouldReturnAllStores() {
+        final Topology topology = setupSourceSinkTopology();
+        topology.addStateStore(
+            new KeyValueStoreBuilder<>(
+                Stores.inMemoryKeyValueStore("store"),
+                Serdes.ByteArray(),
+                Serdes.ByteArray(),
+                new SystemTime())
+                .withLoggingDisabled());
+        topology.addGlobalStore(
+            new KeyValueStoreBuilder<>(
+                Stores.inMemoryKeyValueStore("globalStore"),
+                Serdes.ByteArray(),
+                Serdes.ByteArray(),
+                new SystemTime()).withLoggingDisabled(),
+            "sourceProcessorName",
+            Serdes.ByteArray().deserializer(),
+            Serdes.ByteArray().deserializer(),
+            "globalTopicName",
+            "globalProcessorName",
+            new ProcessorSupplier() {
+                @Override
+                public Processor get() {
+                    return null;
+                }
+            });
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        final Set<String> expectedStoreNames = new HashSet<>();
+        expectedStoreNames.add("store");
+        expectedStoreNames.add("globalStore");
+        assertThat(testDriver.getAllStateStores().keySet(), equalTo(expectedStoreNames));
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
new file mode 100644
index 0000000..f75a069
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ConsumerRecordFactoryTest {
+    private final StringSerializer stringSerializer = new StringSerializer();
+    private final IntegerSerializer integerSerializer = new IntegerSerializer();
+
+    private final String topicName = "topic";
+    private final String otherTopicName = "otherTopic";
+    private final String key = "key";
+    private final Integer value = 42;
+    private final long timestamp = 21L;
+    private final byte[] rawKey = stringSerializer.serialize(topicName, key);
+    private final byte[] rawValue = integerSerializer.serialize(topicName, value);
+
+    private final ConsumerRecordFactory<byte[], Integer> factory =
+        new ConsumerRecordFactory<>(topicName, new ByteArraySerializer(), integerSerializer, 0L);
+
+    private final ConsumerRecordFactory<byte[], Integer> defaultFactory =
+        new ConsumerRecordFactory<>(new ByteArraySerializer(), integerSerializer);
+
+    @Test
+    public void shouldAdvanceTime() {
+        factory.advanceTimeMs(3L);
+        verifyRecord(topicName, rawKey, rawValue, 3L, factory.create(topicName, rawKey, value));
+
+        factory.advanceTimeMs(2L);
+        verifyRecord(topicName, rawKey, rawValue, 5L, factory.create(topicName, rawKey, value));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullTopicName() {
+        factory.create(null, rawKey, value, timestamp);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() {
+        factory.create(null, rawKey, value);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithNulKey() {
+        factory.create((String) null, value, timestamp);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp() {
+        factory.create((String) null, value);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs() {
+        factory.create(null, Collections.singletonList(KeyValue.pair(rawKey, value)));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps() {
+        factory.create(null, Collections.singletonList(KeyValue.pair(rawKey, value)), timestamp, 2L);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName() {
+        defaultFactory.create(rawKey, value, timestamp);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp() {
+        defaultFactory.create(rawKey, value);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey() {
+        defaultFactory.create(value, timestamp);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKeyAndDefaultTimestamp() {
+        defaultFactory.create(value);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs() {
+        defaultFactory.create(Collections.singletonList(KeyValue.pair(rawKey, value)));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps() {
+        defaultFactory.create(Collections.singletonList(KeyValue.pair(rawKey, value)), timestamp, 2L);
+    }
+
+    @Test
+    public void shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp() {
+        verifyRecord(otherTopicName, rawKey, rawValue, timestamp, factory.create(otherTopicName, rawKey, value, timestamp));
+    }
+
+    @Test
+    public void shouldCreateConsumerRecordWithTimestamp() {
+        verifyRecord(topicName, rawKey, rawValue, timestamp, factory.create(rawKey, value, timestamp));
+    }
+
+    @Test
+    public void shouldCreateConsumerRecordWithOtherTopicName() {
+        verifyRecord(otherTopicName, rawKey, rawValue, 0L, factory.create(otherTopicName, rawKey, value));
+
+        factory.advanceTimeMs(3L);
+        verifyRecord(otherTopicName, rawKey, rawValue, 3L, factory.create(otherTopicName, rawKey, value));
+    }
+
+    @Test
+    public void shouldCreateConsumerRecord() {
+        verifyRecord(topicName, rawKey, rawValue, 0L, factory.create(rawKey, value));
+
+        factory.advanceTimeMs(3L);
+        verifyRecord(topicName, rawKey, rawValue, 3L, factory.create(rawKey, value));
+    }
+
+    @Test
+    public void shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp() {
+        verifyRecord(topicName, null, rawValue, timestamp, factory.create(value, timestamp));
+    }
+
+    @Test
+    public void shouldCreateNullKeyConsumerRecordWithTimestampWithTimestamp() {
+        verifyRecord(topicName, null, rawValue, timestamp, factory.create(value, timestamp));
+    }
+
+    @Test
+    public void shouldCreateNullKeyConsumerRecord() {
+        verifyRecord(topicName, null, rawValue, 0L, factory.create(value));
+
+        factory.advanceTimeMs(3L);
+        verifyRecord(topicName, null, rawValue, 3L, factory.create(value));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCreateConsumerRecordsFromKeyValuePairs() {
+        final ConsumerRecordFactory<String, Integer> factory =
+            new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, 0L);
+
+        final KeyValue[] keyValuePairs = new KeyValue[5];
+        final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
+
+        for (int i = 0; i < keyValuePairs.length; ++i) {
+            keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
+            rawKeyValuePairs[i] = KeyValue.pair(
+                stringSerializer.serialize(topicName, key + "-" + i),
+                integerSerializer.serialize(topicName, value + i));
+        }
+
+        final List<ConsumerRecord<byte[], byte[]>> records =
+            factory.create(Arrays.<KeyValue<String, Long>>asList(keyValuePairs));
+
+        for (int i = 0; i < keyValuePairs.length; ++i) {
+            verifyRecord(
+                topicName,
+                (byte[]) rawKeyValuePairs[i].key,
+                (byte[]) rawKeyValuePairs[i].value,
+                0L,
+                records.get(i));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCreateConsumerRecordsFromKeyValuePairsWithTimestampAndIncrements() {
+        final ConsumerRecordFactory<String, Integer> factory =
+            new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, timestamp, 2L);
+
+        final KeyValue[] keyValuePairs = new KeyValue[5];
+        final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
+
+        for (int i = 0; i < keyValuePairs.length; ++i) {
+            keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
+            rawKeyValuePairs[i] = KeyValue.pair(
+                stringSerializer.serialize(topicName, key + "-" + i),
+                integerSerializer.serialize(topicName, value + i));
+        }
+
+        final List<ConsumerRecord<byte[], byte[]>> records =
+            factory.create(Arrays.<KeyValue<String, Long>>asList(keyValuePairs));
+
+        for (int i = 0; i < keyValuePairs.length; ++i) {
+            verifyRecord(
+                topicName,
+                (byte[]) rawKeyValuePairs[i].key,
+                (byte[]) rawKeyValuePairs[i].value,
+                timestamp + 2L * i,
+                records.get(i));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCreateConsumerRecordsFromKeyValuePairsWithCustomTimestampAndIncrementsAndNotAdvanceTime() {
+        final ConsumerRecordFactory<String, Integer> factory =
+            new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, 0L);
+
+        final KeyValue[] keyValuePairs = new KeyValue[5];
+        final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
+
+        for (int i = 0; i < keyValuePairs.length; ++i) {
+            keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
+            rawKeyValuePairs[i] = KeyValue.pair(
+                stringSerializer.serialize(topicName, key + "-" + i),
+                integerSerializer.serialize(topicName, value + i));
+        }
+
+        final List<ConsumerRecord<byte[], byte[]>> records =
+            factory.create(Arrays.<KeyValue<String, Long>>asList(keyValuePairs), timestamp, 2L);
+
+        for (int i = 0; i < keyValuePairs.length; ++i) {
+            verifyRecord(
+                topicName,
+                (byte[]) rawKeyValuePairs[i].key,
+                (byte[]) rawKeyValuePairs[i].value,
+                timestamp + 2L * i,
+                records.get(i));
+        }
+
+        // should not have incremented internally tracked time
+        verifyRecord(topicName, null, rawValue, 0L, factory.create(value));
+    }
+
+    private void verifyRecord(final String topicName,
+                              final byte[] rawKey,
+                              final byte[] rawValue,
+                              final long timestamp,
+                              final ConsumerRecord<byte[], byte[]> record) {
+        assertEquals(topicName, record.topic());
+        assertArrayEquals(rawKey, record.key());
+        assertArrayEquals(rawValue, record.value());
+        assertEquals(timestamp, record.timestamp());
+    }
+
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/OutputVerifierTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/OutputVerifierTest.java
new file mode 100644
index 0000000..8220188
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/OutputVerifierTest.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+public class OutputVerifierTest {
+    private final byte[] key = new byte[0];
+    private  final byte[] value = new byte[0];
+
+    private final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(
+        "someTopic",
+        Integer.MAX_VALUE,
+        Long.MAX_VALUE,
+        key,
+        value
+    );
+
+    private final ProducerRecord<byte[], byte[]> nullKeyValueRecord = new ProducerRecord<byte[], byte[]>(
+        "someTopic",
+        Integer.MAX_VALUE,
+        Long.MAX_VALUE,
+        null,
+        null
+    );
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordForCompareValue() {
+        OutputVerifier.compareValue(null, value);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValue() {
+        OutputVerifier.compareValue((ProducerRecord<byte[], byte[]>) null, producerRecord);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullExpectedRecordForCompareValue() {
+        OutputVerifier.compareValue(producerRecord, (ProducerRecord<byte[], byte[]>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(null, key, value);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(null, producerRecord);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullExpectedRecordForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(producerRecord, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(null, value, 0L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(null, producerRecord);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullExpectedRecordForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(producerRecord, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(null, key, value, 0L);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(null, producerRecord);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, null);
+    }
+
+    @Test
+    public void shouldPassIfValueIsEqualForCompareValue() {
+        OutputVerifier.compareValue(producerRecord, value);
+    }
+
+    @Test
+    public void shouldPassIfValueIsEqualWithNullForCompareValue() {
+        OutputVerifier.compareValue(nullKeyValueRecord, (byte[]) null);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareValue() {
+        OutputVerifier.compareValue(producerRecord, key);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareValue() {
+        OutputVerifier.compareValue(producerRecord, (byte[]) null);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReverseForCompareValue() {
+        OutputVerifier.compareValue(nullKeyValueRecord, value);
+    }
+
+    @Test
+    public void shouldPassIfValueIsEqualForCompareValueWithProducerRecord() {
+        OutputVerifier.compareValue(producerRecord, new ProducerRecord<>(
+            "otherTopic",
+            0,
+            0L,
+            value,
+            value
+        ));
+    }
+
+    @Test
+    public void shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord() {
+        OutputVerifier.compareValue(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+            "otherTopic",
+            0,
+            0L,
+            value,
+            null
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareValueWithProducerRecord() {
+        OutputVerifier.compareValue(producerRecord, new ProducerRecord<>(
+            "sameTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            value,
+            key
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord() {
+        OutputVerifier.compareValue(producerRecord, new ProducerRecord<byte[], byte[]>(
+            "sameTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            value,
+            null
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueWithProducerRecord() {
+        OutputVerifier.compareValue(nullKeyValueRecord, new ProducerRecord<>(
+            "sameTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            value,
+            value
+        ));
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueIsEqualForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(producerRecord, key, value);
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(nullKeyValueRecord, null, null);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(producerRecord, value, value);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(producerRecord, null, value);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                null,
+                value),
+            key,
+            value);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(producerRecord, key, key);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(producerRecord, key, null);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue() {
+        OutputVerifier.compareKeyValue(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                key,
+                null),
+            key,
+            value);
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
+            "otherTopic",
+            0,
+            0L,
+            key,
+            value));
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+            "otherTopic",
+            0,
+            0L,
+            null,
+            null));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            value,
+            value));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<byte[], byte[]>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            null,
+            value));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                null,
+                value),
+            producerRecord);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            key));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<byte[], byte[]>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            null));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord() {
+        OutputVerifier.compareKeyValue(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                key,
+                null),
+            producerRecord);
+    }
+
+    @Test
+    public void shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(producerRecord, value, Long.MAX_VALUE);
+    }
+
+    @Test
+    public void shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(nullKeyValueRecord, null, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(producerRecord, key, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(producerRecord, null, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(nullKeyValueRecord, value, Long.MAX_VALUE);
+    }
+
+    @Test
+    public void shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord() {
+        OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
+            "otherTopic",
+            0,
+            Long.MAX_VALUE,
+            value,
+            value
+        ));
+    }
+
+    @Test
+    public void shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord() {
+        OutputVerifier.compareValueTimestamp(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+            "otherTopic",
+            0,
+            Long.MAX_VALUE,
+            value,
+            null
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord() {
+        OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            key
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord() {
+        OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            null
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord() {
+        OutputVerifier.compareValueTimestamp(nullKeyValueRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            value
+        ));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfTimestampIsDifferentForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(producerRecord, value, 0);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfTimestampDifferentWithNullReverseForCompareValueTimestamp() {
+        OutputVerifier.compareValueTimestamp(nullKeyValueRecord, null, 0);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord() {
+        OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            0L,
+            key,
+            value
+        ));
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+    @Test
+    public void shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, key, value, Long.MAX_VALUE);
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(nullKeyValueRecord, null, null, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, value, value, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, null, value, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                null,
+                value),
+            key,
+            value,
+            Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, key, key, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, key, null, Long.MAX_VALUE);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp() {
+        OutputVerifier.compareKeyValueTimestamp(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                key,
+                null),
+            key,
+            value,
+            Long.MAX_VALUE);
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
+            "otherTopic",
+            0,
+            Long.MAX_VALUE,
+            key,
+            value));
+    }
+
+    @Test
+    public void shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+            "otherTopic",
+            0,
+            Long.MAX_VALUE,
+            null,
+            null));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            value,
+            value));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            null,
+            value));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                null,
+                value),
+            producerRecord);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            key));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
+            "someTopic",
+            Integer.MAX_VALUE,
+            Long.MAX_VALUE,
+            key,
+            null));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord() {
+        OutputVerifier.compareKeyValueTimestamp(
+            new ProducerRecord<byte[], byte[]>(
+                "someTopic",
+                Integer.MAX_VALUE,
+                Long.MAX_VALUE,
+                key,
+                null),
+            producerRecord);
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.