You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/26 18:30:46 UTC

[kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [partial] (#4832)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 885abbf  KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [partial] (#4832)
885abbf is described below

commit 885abbfcd40aab57acec278d976956f07be15090
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Thu Apr 26 19:30:42 2018 +0100

    KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [partial] (#4832)
    
    * Remove ProcessorTopologyTestDriver from TopologyTest
    * Fix ProcessorTopologyTest
    * Remove ProcessorTopologyTestDriver and InternalTopologyAccessor
    * Partially refactored StreamsBuilderTest but missing one test
    * Refactor KStreamBuilderTest
    * Refactor AbstractStreamTest
    * Further cleanup of AbstractStreamTest
    * Refactor GlobalKTableJoinsTest
    * Refactor InternalStreamsBuilderTest
    * Fix circular dependency in build.gradle
    * Refactor KGroupedStreamImplTest
    * Partial modifications to KGroupedTableImplTest
    * Refactor KGroupedTableImplTest
    * Refactor KStreamBranchTest
    * Refactor KStreamFilterTest
    * Refactor KStreamFlatMapTest KStreamFlatMapValuesTest
    * Refactor KStreamForeachTest
    * Refactor KStreamGlobalKTableJoinTest
    * Refactor KStreamGlobalKTableLeftJoinTest
    * Refactor KStreamImplTest
    * Refactor KStreamImplTest
    * Refactor KStreamKStreamJoinTest
    * Refactor KStreamKStreamLeftJoinTest
    * Refactor KStreamKTableJoinTest
    * Refactor KStreamKTableLeftJoinTest
    * Refactor KStreamMapTest and KStreamMapValuesTest
    * Refactor KStreamPeekTest and KStreamTransformTest
    * Refactor KStreamSelectKeyTest
    * Refactor KStreamTransformValuesTest
    * Refactor KStreamWindowAggregateTest
    * Add Depercation anotation to KStreamTestDriver and rollback failing tests in StreamsBuilderTest and KTableAggregateTest
    * Refactor KTableFilterTest
    * Refactor KTableForeachTest
    * Add getter for ProcessorTopology, and simplify tests in StreamsBuilderTest
    * Refactor KTableImplTest
    * Remove unused imports
    * Refactor KTableAggregateTest
    * Fix style errors
    * Fix gradle build
    * Address reviewer comments:
      - Remove properties new instance
      - Remove extraneous line
      - Remove unnecessary TopologyTestDriver instances from StreamsBuilderTest
      - Move props.clear() to @After
      - Clarify use of timestamp in KStreamFlatMapValuesTest
      - Keep test using old Punctuator in KStreamTransformTest
      - Add comment to clarify clock advances in KStreamTransformTest
      - Add TopologyTestDriverWrapper class to access the protected constructor of TopologyTestDriver
      - Revert KTableImplTest.testRepartition to KStreamTestDriver to avoid exposing the TopologyTestDriver processor topology
      - Revert partially migrated classes: KTableAggregateTest, KTableFilterTest, and KTableImplTest
    * Rebase on current trunk an fix conflicts
    
    Reviewers: Matthias J Sax <ma...@confluentio>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../apache/kafka/streams/StreamsBuilderTest.java   | 113 ++---
 .../org/apache/kafka/streams/TopologyTest.java     |   4 +-
 ...ccessor.java => TopologyTestDriverWrapper.java} |  17 +-
 .../kafka/streams/kstream/KStreamBuilderTest.java  |  53 ++-
 .../kstream/internals/AbstractStreamTest.java      |  23 +-
 .../kstream/internals/GlobalKTableJoinsTest.java   |  45 +-
 .../internals/InternalStreamsBuilderTest.java      |  38 +-
 .../kstream/internals/KGroupedStreamImplTest.java  | 143 +++---
 .../kstream/internals/KGroupedTableImplTest.java   |  67 +--
 .../kstream/internals/KStreamBranchTest.java       |  39 +-
 .../kstream/internals/KStreamFilterTest.java       |  46 +-
 .../kstream/internals/KStreamFlatMapTest.java      |  37 +-
 .../internals/KStreamFlatMapValuesTest.java        |  42 +-
 .../kstream/internals/KStreamForeachTest.java      |  44 +-
 .../internals/KStreamGlobalKTableJoinTest.java     |  51 ++-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  34 +-
 .../streams/kstream/internals/KStreamImplTest.java |  93 ++--
 .../kstream/internals/KStreamKStreamJoinTest.java  | 354 ++++++++-------
 .../internals/KStreamKStreamLeftJoinTest.java      | 122 +++--
 .../kstream/internals/KStreamKTableJoinTest.java   |  52 ++-
 .../internals/KStreamKTableLeftJoinTest.java       |  35 +-
 .../streams/kstream/internals/KStreamMapTest.java  |  39 +-
 .../kstream/internals/KStreamMapValuesTest.java    |  43 +-
 .../streams/kstream/internals/KStreamPeekTest.java |  39 +-
 .../kstream/internals/KStreamSelectKeyTest.java    |  38 +-
 .../kstream/internals/KStreamTransformTest.java    | 107 ++++-
 .../internals/KStreamTransformValuesTest.java      |  44 +-
 .../internals/KStreamWindowAggregateTest.java      | 200 +++------
 .../kstream/internals/KTableForeachTest.java       |  39 +-
 .../streams/processor/TopologyBuilderTest.java     |  17 +-
 .../internals/InternalTopologyBuilderTest.java     |   4 +-
 .../processor/internals/ProcessorTopologyTest.java | 115 ++---
 .../org/apache/kafka/test/KStreamTestDriver.java   |   1 +
 .../kafka/test/ProcessorTopologyTestDriver.java    | 490 ---------------------
 .../apache/kafka/streams/TopologyTestDriver.java   |  28 +-
 35 files changed, 1290 insertions(+), 1366 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4a496b8..d3e01fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -28,13 +30,14 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Rule;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -43,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -54,9 +58,26 @@ import static org.junit.Assert.assertFalse;
 public class StreamsBuilderTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-builder-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test(expected = TopologyException.class)
     public void testFrom() {
@@ -70,9 +91,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate());
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -85,9 +104,7 @@ public class StreamsBuilderTest {
                 .filter(MockPredicate.<Bytes, String>allGoodPredicate(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
@@ -99,9 +116,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper());
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -114,9 +129,7 @@ public class StreamsBuilderTest {
                 .mapValues(MockMapper.<String>noOpValueMapper(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
@@ -129,14 +142,11 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(2));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name())));
         assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(), is(true));
-
     }
 
     @Test
@@ -145,9 +155,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")), MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(3));
         assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store")));
@@ -159,9 +167,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> table = builder.table("table-topic");
         builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-
-        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
         assertThat(topology.stateStores().size(), equalTo(1));
         assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -177,10 +183,10 @@ public class StreamsBuilderTest {
 
         source.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process("topic-source", "A", "aa");
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
         // no exception was thrown
         assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
@@ -197,10 +203,10 @@ public class StreamsBuilderTest {
         source.process(sourceProcessorSupplier);
         through.process(throughProcessorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process("topic-source", "A", "aa");
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
         assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
         assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
@@ -218,13 +224,13 @@ public class StreamsBuilderTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process(topic1, "A", "aa");
-        driver.process(topic2, "B", "bb");
-        driver.process(topic2, "C", "cc");
-        driver.process(topic1, "D", "dd");
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
@@ -244,12 +250,13 @@ public class StreamsBuilderTest {
                 .withValueSerde(Serdes.String()))
                 .toStream().foreach(action);
 
-        driver.setUp(builder, TestUtils.tempDirectory());
-        driver.setTime(0L);
-        driver.process(topic, 1L, "value1");
-        driver.process(topic, 2L, "value2");
-        driver.flushState();
-        final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>) driver.allStateStores().get("store");
+        driver = new TopologyTestDriver(builder.build(), props);
+
+        final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+        driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+        driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+
+        final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
         assertThat(results.get(1L), equalTo("value1"));
@@ -262,12 +269,14 @@ public class StreamsBuilderTest {
         builder.globalTable(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
                 .withKeySerde(Serdes.Long())
                 .withValueSerde(Serdes.String()));
-        driver.setUp(builder, TestUtils.tempDirectory());
-        driver.setTime(0L);
-        driver.process(topic, 1L, "value1");
-        driver.process(topic, 2L, "value2");
-        driver.flushState();
-        final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>) driver.allStateStores().get("store");
+
+        driver = new TopologyTestDriver(builder.build(), props);
+
+        final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+        driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+        driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+        final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
     }
@@ -295,12 +304,12 @@ public class StreamsBuilderTest {
     }
     
     @Test(expected = TopologyException.class)
-    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+    public void shouldThrowExceptionWhenNoTopicPresent() {
         builder.stream(Collections.<String>emptyList());
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+    public void shouldThrowExceptionWhenTopicNamesAreNull() {
         builder.stream(Arrays.<String>asList(null, null));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 6834091..eee3386 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStore;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -261,7 +260,6 @@ public class TopologyTest {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
         mockStoreBuilder();
         EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStore("store", false));
         EasyMock.replay(storeBuilder);
@@ -274,7 +272,7 @@ public class TopologyTest {
             .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
 
         try {
-            new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+            new TopologyTestDriver(topology, config);
             fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
             final String error = e.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
similarity index 58%
rename from streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
rename to streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
index c3c4504..fa976a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -14,19 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.Properties;
 
 /**
- * This class is meant for testing purposes only and allows the testing of
- * topologies by using the  {@link org.apache.kafka.test.ProcessorTopologyTestDriver}
+ *  This class allows the instantiation of a {@link TopologyTestDriver} using a
+ *  {@link InternalTopologyBuilder} by exposing a protected constructor.
+ *
+ *  It should be used only for testing, and should be removed once the deprecated
+ *  classes {@link org.apache.kafka.streams.kstream.KStreamBuilder} and
+ *  {@link org.apache.kafka.streams.processor.TopologyBuilder} are removed.
  */
-public class InternalTopologyAccessor {
+public class TopologyTestDriverWrapper extends TopologyTestDriver {
 
-    public static InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
-        return topology.internalTopologyBuilder;
+    public TopologyTestDriverWrapper(final InternalTopologyBuilder builder,
+                                     final Properties config) {
+        super(builder, config);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index f9949d3..81bdb31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -18,7 +18,10 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
@@ -26,19 +29,21 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -55,12 +60,27 @@ public class KStreamBuilderTest {
     private static final String APP_ID = "app-id";
 
     private final KStreamBuilder builder = new KStreamBuilder();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private TopologyTestDriverWrapper driver;
+    private final Properties props = new Properties();
 
     @Before
-    public void setUp() {
+    public void setup() {
         builder.setApplicationId(APP_ID);
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-builder-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test(expected = TopologyBuilderException.class)
@@ -93,10 +113,8 @@ public class KStreamBuilderTest {
 
         source.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
-
-        driver.process("topic-source", "A", "aa");
+        driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
         // no exception was thrown
         assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
@@ -113,10 +131,8 @@ public class KStreamBuilderTest {
         source.process(sourceProcessorSupplier);
         through.process(throughProcessorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
-
-        driver.process("topic-source", "A", "aa");
+        driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
         assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
         assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
@@ -147,13 +163,12 @@ public class KStreamBuilderTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
 
-        driver.process(topic1, "A", "aa");
-        driver.process(topic2, "B", "bb");
-        driver.process(topic2, "C", "cc");
-        driver.process(topic1, "D", "dd");
+        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index dcfb9ba..2aa07f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -16,20 +16,25 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.util.Properties;
 import java.util.Random;
 
 import static org.easymock.EasyMock.createMock;
@@ -40,10 +45,6 @@ import static org.junit.Assert.assertTrue;
 
 public class AbstractStreamTest {
 
-    private final String topicName = "topic";
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-
     @Test
     public void testToInternlValueTransformerSupplierSuppliesNewTransformers() {
         final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class);
@@ -82,9 +83,15 @@ public class AbstractStreamTest {
 
         stream.randomFilter().process(processor);
 
-        driver.setUp(builder);
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+
+        final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+        final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
         assertTrue(processor.processed.size() <= expectedKeys.length);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index eebb7d2..8c50afe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -17,22 +17,25 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
@@ -43,17 +46,15 @@ public class GlobalKTableJoinsTest {
     private final Map<String, String> results = new HashMap<>();
     private final String streamTopic = "stream";
     private final String globalTopic = "global";
-    private File stateDir;
     private GlobalKTable<String, String> global;
     private KStream<String, String> stream;
     private KeyValueMapper<String, String, String> keyValueMapper;
     private ForeachAction<String, String> action;
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private TopologyTestDriver driver;
+
 
     @Before
     public void setUp() {
-        stateDir = TestUtils.tempDirectory();
         final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
         global = builder.globalTable(globalTopic, consumed);
         stream = builder.stream(streamTopic, consumed);
@@ -71,6 +72,14 @@ public class GlobalKTableJoinsTest {
         };
     }
 
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void shouldLeftJoinWithStream() {
         stream
@@ -100,16 +109,22 @@ public class GlobalKTableJoinsTest {
     }
 
     private void verifyJoin(final Map<String, String> expected) {
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "global-ktable-joins-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+
+        driver = new TopologyTestDriver(builder.build(), props);
+
         // write some data to the global table
-        driver.process(globalTopic, "a", "A");
-        driver.process(globalTopic, "b", "B");
+        driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
+        driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
         //write some data to the stream
-        driver.process(streamTopic, "1", "a");
-        driver.process(streamTopic, "2", "b");
-        driver.process(streamTopic, "3", "c");
-        driver.flushState();
+        driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
+        driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
+        driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
 
         assertEquals(expected, results);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index b9ba608..76ca495 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -30,11 +30,9 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -58,8 +56,6 @@ public class InternalStreamsBuilderTest {
     private static final String APP_ID = "app-id";
 
     private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-
-    private KStreamTestDriver driver = null;
     private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
     private final String storePrefix = "prefix-";
     private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
@@ -70,14 +66,6 @@ public class InternalStreamsBuilderTest {
         builder.internalTopologyBuilder.setApplicationId(APP_ID);
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void testNewName() {
         assertEquals("X-0000000000", builder.newProcessorName("X-"));
@@ -105,7 +93,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
         final String topic3 = "topic-3";
@@ -138,7 +126,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() throws Exception {
+    public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
         KTable table1 = builder.table("topic2",
                                       consumed,
                                       new MaterializedInternal<>(
@@ -158,7 +146,7 @@ public class InternalStreamsBuilderTest {
     }
     
     @Test
-    public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
+    public void shouldBuildGlobalTableWithNonQueryableStoreName() {
         final GlobalKTable<String, String> table1 = builder.globalTable(
             "topic2",
             consumed,
@@ -171,7 +159,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
+    public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
         final GlobalKTable<String, String> table1 = builder.globalTable(
             "topic2",
             consumed,
@@ -184,7 +172,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldBuildSimpleGlobalTableTopology() throws Exception {
+    public void shouldBuildSimpleGlobalTableTopology() {
         builder.globalTable("table",
                             consumed,
                             new MaterializedInternal<>(
@@ -199,7 +187,7 @@ public class InternalStreamsBuilderTest {
         assertEquals("globalTable", stateStores.get(0).name());
     }
 
-    private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+    private void doBuildGlobalTopologyWithAllGlobalTables() {
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
 
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -210,7 +198,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+    public void shouldBuildGlobalTopologyWithAllGlobalTables() {
         builder.globalTable("table",
                             consumed,
                             new MaterializedInternal<>(
@@ -224,7 +212,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldAddGlobalTablesToEachGroup() throws Exception {
+    public void shouldAddGlobalTablesToEachGroup() {
         final String one = "globalTable";
         final String two = "globalTable2";
 
@@ -269,7 +257,7 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+    public void shouldMapStateStoresToCorrectSourceTopics() {
         final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
@@ -367,14 +355,14 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
+    public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.stream(Collections.singleton("topic"), consumed);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
-    public void shouldUseProvidedTimestampExtractor() throws Exception {
+    public void shouldUseProvidedTimestampExtractor() {
         final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
         builder.stream(Collections.singleton("topic"), consumed);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
@@ -382,14 +370,14 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
+    public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.table("topic", consumed, materialized);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
-    public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
+    public void ktableShouldUseProvidedTimestampExtractor() {
         final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
         builder.table("topic", consumed, materialized);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index c74d0dc..b9ca30f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -20,10 +20,13 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -43,13 +46,13 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -57,6 +60,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -72,13 +76,30 @@ public class KGroupedStreamImplTest {
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private final StreamsBuilder builder = new StreamsBuilder();
     private KGroupedStream<String, String> groupedStream;
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
 
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
+
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-stream-impl-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @SuppressWarnings("deprecation")
@@ -203,20 +224,13 @@ public class KGroupedStreamImplTest {
     }
 
     private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
-        driver.setUp(builder, TestUtils.tempDirectory());
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "2", "2");
-        driver.setTime(30);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(70);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(90);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(100);
-        driver.process(TOPIC, "1", "1");
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
         assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -284,20 +298,13 @@ public class KGroupedStreamImplTest {
     }
 
     private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
-        driver.setUp(builder, TestUtils.tempDirectory());
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "2", "2");
-        driver.setTime(30);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(70);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(90);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(100);
-        driver.process(TOPIC, "1", "1");
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
         assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -334,20 +341,13 @@ public class KGroupedStreamImplTest {
     }
 
     private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
-        driver.setUp(builder, TestUtils.tempDirectory());
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "A");
-        driver.setTime(15);
-        driver.process(TOPIC, "2", "Z");
-        driver.setTime(30);
-        driver.process(TOPIC, "1", "B");
-        driver.setTime(70);
-        driver.process(TOPIC, "1", "A");
-        driver.setTime(90);
-        driver.process(TOPIC, "1", "B");
-        driver.setTime(100);
-        driver.process(TOPIC, "1", "C");
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
         assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -556,8 +556,7 @@ public class KGroupedStreamImplTest {
 
         processData();
 
-        @SuppressWarnings("unchecked") final KeyValueStore<String, Long> count =
-            (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+        final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
 
         assertThat(count.get("1"), equalTo(3L));
         assertThat(count.get("2"), equalTo(1L));
@@ -571,10 +570,10 @@ public class KGroupedStreamImplTest {
         processData();
         LogCaptureAppender.unregister(appender);
 
-        final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+        final Map<MetricName, ? extends Metric> metrics = driver.metrics();
         assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
         assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
     }
 
 
@@ -589,7 +588,7 @@ public class KGroupedStreamImplTest {
 
         processData();
 
-        final KeyValueStore<String, String> reduced = (KeyValueStore<String, String>) driver.allStateStores().get("reduce");
+        final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
 
         assertThat(reduced.get("1"), equalTo("A+C+D"));
         assertThat(reduced.get("2"), equalTo("B"));
@@ -609,10 +608,10 @@ public class KGroupedStreamImplTest {
         processData();
         LogCaptureAppender.unregister(appender);
 
-        final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+        final Map<MetricName, ? extends Metric> metrics = driver.metrics();
         assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
         assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
     }
 
 
@@ -628,7 +627,7 @@ public class KGroupedStreamImplTest {
 
         processData();
 
-        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+        final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
 
         assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
         assertThat(aggregate.get("2"), equalTo("0+B"));
@@ -658,29 +657,25 @@ public class KGroupedStreamImplTest {
     }
 
     private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
-        driver.setTime(0);
-        driver.process(TOPIC, "1", "A");
-        driver.process(TOPIC, "2", "B");
-        driver.process(TOPIC, "1", "C");
-        driver.process(TOPIC, "1", "D");
-        driver.process(TOPIC, "3", "E");
-        driver.process(TOPIC, "3", "F");
-        driver.process(TOPIC, "3", null);
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "A"));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "B"));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "C"));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "D"));
+        driver.pipeInput(recordFactory.create(TOPIC, "3", "E"));
+        driver.pipeInput(recordFactory.create(TOPIC, "3", "F"));
+        driver.pipeInput(recordFactory.create(TOPIC, "3", null));
     }
 
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
-        driver.setTime(0);
-        driver.process(TOPIC, "1", "A");
-        driver.process(TOPIC, "2", "B");
-        driver.process(TOPIC, "3", "C");
-        driver.setTime(500);
-        driver.process(TOPIC, "1", "A");
-        driver.process(TOPIC, "1", "A");
-        driver.process(TOPIC, "2", "B");
-        driver.process(TOPIC, "2", "B");
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
+        driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
         assertThat(results, equalTo(Arrays.asList(
             KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
             KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 3bbd7e2..742f349 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -17,11 +17,15 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.serialization.DoubleSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KTable;
@@ -30,18 +34,19 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -55,14 +60,29 @@ public class KGroupedTableImplTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
     private final String topic = "input";
 
     @Before
     public void before() {
         groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
                 .groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
+
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-table-impl-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test
@@ -122,6 +142,7 @@ public class KGroupedTableImplTest {
 
     private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
         final Map<String, Integer> results = new HashMap<>();
+        final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
         reduced.foreach(new ForeachAction<String, Integer>() {
             @Override
             public void apply(final String key, final Integer value) {
@@ -129,20 +150,17 @@ public class KGroupedTableImplTest {
             }
         });
 
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
-        driver.setTime(10L);
-        driver.process(topic, "A", 1.1);
-        driver.process(topic, "B", 2.2);
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
+        driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
 
         assertEquals(Integer.valueOf(1), results.get("A"));
         assertEquals(Integer.valueOf(2), results.get("B"));
 
-        driver.process(topic, "A", 2.6);
-        driver.process(topic, "B", 1.3);
-        driver.process(topic, "A", 5.7);
-        driver.process(topic, "B", 6.2);
-        driver.flushState();
+        driver.pipeInput(recordFactory.create(topic, "A", 2.6, 10));
+        driver.pipeInput(recordFactory.create(topic, "B", 1.3, 10));
+        driver.pipeInput(recordFactory.create(topic, "A", 5.7, 10));
+        driver.pipeInput(recordFactory.create(topic, "B", 6.2, 10));
 
         assertEquals(Integer.valueOf(5), results.get("A"));
         assertEquals(Integer.valueOf(6), results.get("B"));
@@ -212,7 +230,7 @@ public class KGroupedTableImplTest {
                                 .withValueSerde(Serdes.Integer()));
 
         doShouldReduce(reduced, topic);
-        final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.allStateStores().get("reduce");
+        final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
         assertThat(reduce.get("A"), equalTo(5));
         assertThat(reduce.get("B"), equalTo(6));
     }
@@ -229,7 +247,7 @@ public class KGroupedTableImplTest {
                                .withValueSerde(Serdes.Long()));
 
         processData(topic);
-        final KeyValueStore<String, Long> counts = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+        final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
         assertThat(counts.get("1"), equalTo(3L));
         assertThat(counts.get("2"), equalTo(2L));
     }
@@ -249,7 +267,7 @@ public class KGroupedTableImplTest {
                                    .withKeySerde(Serdes.String()));
 
         processData(topic);
-        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
         assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
         assertThat(aggregate.get("2"), equalTo("0+2+2"));
     }
@@ -310,13 +328,12 @@ public class KGroupedTableImplTest {
     }
 
     private void processData(final String topic) {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
-        driver.setTime(0L);
-        driver.process(topic, "A", "1");
-        driver.process(topic, "B", "1");
-        driver.process(topic, "C", "1");
-        driver.process(topic, "D", "2");
-        driver.process(topic, "E", "2");
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props);
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        driver.pipeInput(recordFactory.create(topic, "A", "1"));
+        driver.pipeInput(recordFactory.create(topic, "B", "1"));
+        driver.pipeInput(recordFactory.create(topic, "C", "1"));
+        driver.pipeInput(recordFactory.create(topic, "D", "2"));
+        driver.pipeInput(recordFactory.create(topic, "E", "2"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 702631a..a70bc37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -16,26 +16,51 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Array;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
 public class KStreamBranchTest {
 
-    private String topicName = "topic";
+    private final String topicName = "topic";
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void before() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-branch-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @SuppressWarnings("unchecked")
     @Test
@@ -78,9 +103,9 @@ public class KStreamBranchTest {
             branches[i].process(processors[i]);
         }
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
         assertEquals(3, processors[0].processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index f1a6152..a67d688 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -16,25 +16,51 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.junit.Assert.assertEquals;
 
 public class KStreamFilterTest {
 
-    private String topicName = "topic";
-  
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final String topicName = "topic";
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void before() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-filter-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
 
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+  
     private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
         public boolean test(Integer key, String value) {
@@ -54,9 +80,9 @@ public class KStreamFilterTest {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filter(isMultipleOfThree).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
         assertEquals(2, processor.processed.size());
@@ -74,9 +100,9 @@ public class KStreamFilterTest {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filterNot(isMultipleOfThree).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
         assertEquals(5, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 59dad47..e414218 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -16,27 +16,52 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
 public class KStreamFlatMapTest {
 
     private String topicName = "topic";
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    @Before
+    public void before() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testFlatMap() {
@@ -63,9 +88,9 @@ public class KStreamFlatMapTest {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.flatMap(mapper).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
         assertEquals(6, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index ecfa3aa..14213c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -16,27 +16,51 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Properties;
 
 import static org.junit.Assert.assertArrayEquals;
 
 public class KStreamFlatMapValuesTest {
 
     private String topicName = "topic";
+    private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void before() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-values-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testFlatMapValues() {
@@ -59,9 +83,10 @@ public class KStreamFlatMapValuesTest {
         final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
         stream.flatMapValues(mapper).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (final int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, expectedKey);
+            // passing the timestamp to recordFactory.create to disambiguate the call
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
         }
 
         String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
@@ -92,9 +117,10 @@ public class KStreamFlatMapValuesTest {
 
         stream.flatMapValues(mapper).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (final int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, expectedKey);
+            // passing the timestamp to recordFactory.create to disambiguate the call
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
         }
 
         String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index e8854fc..b975c96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -16,32 +16,58 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.Rule;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
 public class KStreamForeachTest {
 
-    final private String topicName = "topic";
+    private final String topicName = "topic";
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private Properties props = new Properties();
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    @Before
+    public void before() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-foreach-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    private final Serde<Integer> intSerde = Serdes.Integer();
+    private final Serde<String> stringSerde = Serdes.String();
 
     @Test
     public void testForeach() {
@@ -75,9 +101,9 @@ public class KStreamForeachTest {
         stream.foreach(action);
 
         // Then
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (KeyValue<Integer, String> record: inputRecords) {
-            driver.process(topicName, record.key, record.value);
+            driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
         }
 
         assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index d1d048b..2936f5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -16,46 +16,46 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
 public class KStreamGlobalKTableJoinTest {
 
-    final private String streamTopic = "streamTopic";
-    final private String globalTableTopic = "globalTableTopic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
+    private final String streamTopic = "streamTopic";
+    private final String globalTableTopic = "globalTableTopic";
+    private final Serde<Integer> intSerde = Serdes.Integer();
+    private final Serde<String> stringSerde = Serdes.String();
+    private TopologyTestDriver driver;
     private MockProcessorSupplier<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
     @Before
     public void setUp() throws IOException {
-        stateDir = TestUtils.tempDirectory("kafka-test");
 
         builder = new StreamsBuilder();
         final KStream<Integer, String> stream;
@@ -78,29 +78,46 @@ public class KStreamGlobalKTableJoinTest {
         };
         stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        driver = new TopologyTestDriver(builder.build(), props);
+    }
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+        final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
             String value = valuePrefix + expectedKeys[i];
             if (includeForeignKey) {
                 value = value + ",FKey" + expectedKeys[i];
             }
-            driver.process(streamTopic, expectedKeys[i], value);
+            driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], value));
         }
     }
 
     private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]));
         }
     }
 
     private void pushNullValueToGlobalTable(final int messageCount) {
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 8b7dd42..8882113 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -16,25 +16,28 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -43,19 +46,15 @@ public class KStreamGlobalKTableLeftJoinTest {
 
     final private String streamTopic = "streamTopic";
     final private String globalTableTopic = "globalTableTopic";
-
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
+    private TopologyTestDriver driver;
     private MockProcessorSupplier<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
     @Before
     public void setUp() throws IOException {
-        stateDir = TestUtils.tempDirectory("kafka-test");
 
         builder = new StreamsBuilder();
         final KStream<Integer, String> stream;
@@ -78,29 +77,38 @@ public class KStreamGlobalKTableLeftJoinTest {
         };
         stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        driver = new TopologyTestDriver(builder.build(), props);
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+        final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
             String value = valuePrefix + expectedKeys[i];
             if (includeForeignKey) {
                 value = value + ",FKey" + expectedKeys[i];
             }
-            driver.process(streamTopic, expectedKeys[i], value);
+            driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], value));
         }
     }
 
     private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]));
         }
     }
 
     private void pushNullValueToGlobalTable(final int messageCount) {
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+            driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index ef65bb3..f397246 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -42,16 +45,18 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -71,13 +76,29 @@ public class KStreamImplTest {
     private StreamsBuilder builder;
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
 
     @Before
     public void before() {
         builder = new StreamsBuilder();
         testStream = builder.stream("source");
+
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-impl-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test
@@ -204,8 +225,8 @@ public class KStreamImplTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.process(input, "a", "b");
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(input, "a", "b"));
         assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b")));
     }
 
@@ -218,8 +239,8 @@ public class KStreamImplTest {
         stream.to("to-topic", Produced.with(stringSerde, stringSerde));
         builder.stream("to-topic", consumed).process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.process(input, "e", "f");
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create(input, "e", "f"));
         assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f")));
     }
 
@@ -501,13 +522,12 @@ public class KStreamImplTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process(topic1, "A", "aa");
-        driver.process(topic2, "B", "bb");
-        driver.process(topic2, "C", "cc");
-        driver.process(topic1, "D", "dd");
+        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
@@ -528,17 +548,16 @@ public class KStreamImplTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process(topic1, "A", "aa");
-        driver.process(topic2, "B", "bb");
-        driver.process(topic3, "C", "cc");
-        driver.process(topic4, "D", "dd");
-        driver.process(topic4, "E", "ee");
-        driver.process(topic3, "F", "ff");
-        driver.process(topic2, "G", "gg");
-        driver.process(topic1, "H", "hh");
+        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+        driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
+        driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
+        driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
+        driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
+        driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
+        driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
                      processorSupplier.processed);
@@ -551,14 +570,13 @@ public class KStreamImplTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         pattern2Source.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process("topic-3", "A", "aa");
-        driver.process("topic-4", "B", "bb");
-        driver.process("topic-5", "C", "cc");
-        driver.process("topic-6", "D", "dd");
-        driver.process("topic-7", "E", "ee");
+        driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+        driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+        driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
+        driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
+        driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
                 processorSupplier.processed);
@@ -576,14 +594,13 @@ public class KStreamImplTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver.setUp(builder);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
-        driver.process("topic-3", "A", "aa");
-        driver.process("topic-4", "B", "bb");
-        driver.process("topic-A", "C", "cc");
-        driver.process("topic-Z", "D", "dd");
-        driver.process(topic3, "E", "ee");
+        driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+        driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+        driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
+        driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
+        driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
                 processorSupplier.processed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 4d2bce5..63a040a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -16,30 +16,32 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -55,14 +57,27 @@ public class KStreamKStreamJoinTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
     private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
 
     @Before
     public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-join-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test
@@ -71,6 +86,7 @@ public class KStreamKStreamJoinTest {
 
         final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
         final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+        final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
 
         left.join(
             right,
@@ -85,13 +101,13 @@ public class KStreamKStreamJoinTest {
         );
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.setUp(builder, stateDir);
-        driver.process("left", "A", null);
+        driver = new TopologyTestDriver(builder.build(), props);
+        driver.pipeInput(recordFactory.create("left", "A", null));
         LogCaptureAppender.unregister(appender);
 
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[-1] offset=[-1]"));
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
 
-        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
     }
 
     @Test
@@ -120,8 +136,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props);
 
         // push two items to the primary stream. the other window is empty
         // w1 = {}
@@ -130,7 +145,7 @@ public class KStreamKStreamJoinTest {
         //     w2 = {}
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
         }
 
         processor.checkAndClearProcessResult();
@@ -142,7 +157,7 @@ public class KStreamKStreamJoinTest {
         //     w2 = { 0:Y0, 1:Y1 }
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -153,8 +168,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1 }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -165,8 +180,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -177,8 +192,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -190,7 +205,7 @@ public class KStreamKStreamJoinTest {
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
         }
 
         processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
@@ -222,8 +237,7 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         // push two items to the primary stream. the other window is empty.this should produce two items
         // w1 = {}
@@ -232,7 +246,7 @@ public class KStreamKStreamJoinTest {
         //     w2 = {}
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
         }
 
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
@@ -244,7 +258,7 @@ public class KStreamKStreamJoinTest {
         //     w2 = { 0:Y0, 1:Y1 }
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -255,8 +269,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1 }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
@@ -267,8 +281,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -279,8 +293,8 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -292,7 +306,7 @@ public class KStreamKStreamJoinTest {
         //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
         }
 
         processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
@@ -327,17 +341,15 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
-
+        driver = new TopologyTestDriver(builder.build(), props, time);
 
         // push two items to the primary stream. the other window is empty. this should produce no items.
         // w1 = {}
         // w2 = {}
         // --> w1 = { 0:X0, 1:X1 }
         //     w2 = {}
-        setRecordContext(time, topic1);
         for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
         }
 
         processor.checkAndClearProcessResult();
@@ -348,58 +360,53 @@ public class KStreamKStreamJoinTest {
         // --> w1 = { 0:X0, 1:X1 }
         //     w2 = { 0:Y0, 1:Y1 }
 
-        setRecordContext(time, topic2);
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
         }
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
         // clear logically
         time = 1000L;
-        setRecordContext(time, topic1);
         for (int i = 0; i < expectedKeys.length; i++) {
-            setRecordContext(time + i, topic1);
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
         }
         processor.checkAndClearProcessResult();
 
         // gradually expires items in w1
         // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
 
-        time = 1000 + 100L;
-        setRecordContext(time, topic2);
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 100L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
@@ -407,37 +414,36 @@ public class KStreamKStreamJoinTest {
         // go back to the time before expiration
 
         time = 1000L - 100L - 1L;
-        setRecordContext(time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -445,8 +451,7 @@ public class KStreamKStreamJoinTest {
         // clear (logically)
         time = 2000L;
         for (int i = 0; i < expectedKeys.length; i++) {
-            setRecordContext(time + i, topic2);
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
         }
 
         processor.checkAndClearProcessResult();
@@ -455,37 +460,36 @@ public class KStreamKStreamJoinTest {
         // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
         time = 2000L + 100L;
-        setRecordContext(time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
@@ -493,37 +497,36 @@ public class KStreamKStreamJoinTest {
         // go back to the time before expiration
 
         time = 2000L - 100L - 1L;
-        setRecordContext(time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
-        setRecordContext(++time, topic1);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
@@ -560,85 +563,80 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver = new TopologyTestDriver(builder.build(), props, time);
 
         for (int i = 0; i < expectedKeys.length; i++) {
-            setRecordContext(time + i, topic1);
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
         }
         processor.checkAndClearProcessResult();
 
 
         time = 1000L - 1L;
-        setRecordContext(time, topic2);
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
         time = 1000 + 100L;
-        setRecordContext(time, topic2);
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
@@ -673,90 +671,82 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver = new TopologyTestDriver(builder.build(), props, time);
 
         for (int i = 0; i < expectedKeys.length; i++) {
-            setRecordContext(time + i, topic1);
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
         }
         processor.checkAndClearProcessResult();
 
 
         time = 1000L - 100L - 1L;
-
-        setRecordContext(time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-        time = 1000L;
 
-        setRecordContext(time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time = 1000L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult("3:X3+YY3");
 
-        setRecordContext(++time, topic2);
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+        time += 1L;
+        for (int expectedKey : expectedKeys) {
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
         }
 
         processor.checkAndClearProcessResult();
     }
-
-    private void setRecordContext(final long time, final String topic) {
-        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 465082b..cb1aaf1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -16,29 +16,30 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -50,15 +51,27 @@ public class KStreamKStreamLeftJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
     private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
-
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private Properties props = new Properties();
 
     @Before
-    public void setUp() throws IOException {
-        stateDir = TestUtils.tempDirectory("kafka-test");
+    public void setUp() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-left-join-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test
@@ -87,8 +100,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
-        driver.setTime(0L);
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         // push two items to the primary stream. the other window is empty
         // w1 {}
@@ -97,9 +109,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w2 = {}
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
         // push two items to the other stream. this should produce two items.
@@ -109,9 +120,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w2 = { 0:Y0, 1:Y1 }
 
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
         }
-        driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
@@ -122,9 +132,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w2 = { 0:Y0, 1:Y1 }
 
         for (int i = 0; i < 3; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
 
         // push all items to the other stream. this should produce 5 items
@@ -134,9 +143,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
 
         for (int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
 
         // push all four items to the primary stream. this should produce six items.
@@ -146,9 +154,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
 
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
     }
 
@@ -178,7 +185,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver = new TopologyTestDriver(builder.build(), props, time);
 
         // push two items to the primary stream. the other window is empty. this should produce two items
         // w1 = {}
@@ -186,11 +193,9 @@ public class KStreamKStreamLeftJoinTest {
         // --> w1 = { 0:X0, 1:X1 }
         // --> w2 = {}
 
-        setRecordContext(time, topic1);
         for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
         // push two items to the other stream. this should produce no items.
@@ -199,16 +204,13 @@ public class KStreamKStreamLeftJoinTest {
         // --> w1 = { 0:X0, 1:X1 }
         // --> w2 = { 0:Y0, 1:Y1 }
 
-        setRecordContext(time, topic2);
         for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
         // clear logically
         time = 1000L;
-        setRecordContext(time, topic2);
 
         // push all items to the other stream. this should produce no items.
         // w1 = {}
@@ -216,10 +218,8 @@ public class KStreamKStreamLeftJoinTest {
         // --> w1 = {}
         // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
         for (int i = 0; i < expectedKeys.length; i++) {
-            setRecordContext(time + i, topic2);
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult();
 
         // gradually expire items in window 2.
@@ -229,81 +229,65 @@ public class KStreamKStreamLeftJoinTest {
         // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
         time = 1000L + 100L;
-        setRecordContext(time, topic1);
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
         // go back to the time before expiration
 
         time = 1000L - 100L - 1L;
-        setRecordContext(time, topic1);
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
 
-        setRecordContext(++time, topic1);
+        time += 1L;
         for (int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
         }
-        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
     }
-
-    private void setRecordContext(final long time, final String topic) {
-        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 1800385..5b2a797 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -16,26 +16,29 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -45,24 +48,21 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableJoinTest {
 
-    final private String streamTopic = "streamTopic";
-    final private String tableTopic = "tableTopic";
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Serde<Integer> intSerde = Serdes.Integer();
+    private final Serde<String> stringSerde = Serdes.String();
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
     private MockProcessorSupplier<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
     @Before
     public void setUp() {
-        final File stateDir = TestUtils.tempDirectory("kafka-test");
-
         builder = new StreamsBuilder();
 
-
         final KStream<Integer, String> stream;
         final KTable<Integer, String> table;
 
@@ -72,25 +72,31 @@ public class KStreamKTableJoinTest {
         table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
-            driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
         }
     }
 
     private void pushToTable(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
-            driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
         }
     }
 
     private void pushNullValueToTable() {
         for (int i = 0; i < 2; i++) {
-            driver.process(tableTopic, expectedKeys[i], null);
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
         }
     }
 
@@ -188,20 +194,20 @@ public class KStreamKTableJoinTest {
     @Test
     public void shouldLogAndMeterWhenSkippingNullLeftKey() {
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.process(streamTopic, null, "A");
+        driver.pipeInput(recordFactory.create(streamTopic, null, "A"));
         LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[-1] offset=[-1]"));
+        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[0] offset=[0]"));
     }
 
     @Test
     public void shouldLogAndMeterWhenSkippingNullLeftValue() {
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.process(streamTopic, 1, null);
+        driver.pipeInput(recordFactory.create(streamTopic, 1, null));
         LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[-1] offset=[-1]"));
+        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[0] offset=[0]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 7507d7a..669f4c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -16,26 +16,28 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -47,17 +49,14 @@ public class KStreamKTableLeftJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
     private MockProcessorSupplier<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
     @Before
-    public void setUp() throws IOException {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-
+    public void setUp() {
         builder = new StreamsBuilder();
 
 
@@ -70,25 +69,31 @@ public class KStreamKTableLeftJoinTest {
         table = builder.table(tableTopic, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
-            driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
         }
     }
 
     private void pushToTable(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
-            driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
         }
     }
 
     private void pushNullValueToTable(final int messageCount) {
         for (int i = 0; i < messageCount; i++) {
-            driver.process(tableTopic, expectedKeys[i], null);
+            driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index eed7d7d..bb22204 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -16,18 +16,26 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.junit.Assert.assertEquals;
 
 public class KStreamMapTest {
@@ -36,8 +44,27 @@ public class KStreamMapTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testMap() {
@@ -59,9 +86,9 @@ public class KStreamMapTest {
         processor = new MockProcessorSupplier<>();
         stream.map(mapper).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
         assertEquals(4, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 2cedfb4..17a13e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -16,18 +16,26 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.junit.Assert.assertArrayEquals;
 
 public class KStreamMapValuesTest {
@@ -36,8 +44,27 @@ public class KStreamMapValuesTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-values-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testFlatMapValues() {
@@ -58,9 +85,9 @@ public class KStreamMapValuesTest {
         stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
         stream.mapValues(mapper).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, Integer.toString(expectedKey));
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
         }
         String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
 
@@ -86,9 +113,9 @@ public class KStreamMapValuesTest {
         stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
         stream.mapValues(mapper).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, Integer.toString(expectedKey));
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
         }
         String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 215b0c3..2c6ff81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -16,20 +16,26 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.KStreamTestDriver;
-
-import org.junit.Rule;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -39,8 +45,27 @@ public class KStreamPeekTest {
     private final String topicName = "topic";
     private final Serde<Integer> intSerd = Serdes.Integer();
     private final Serde<String> stringSerd = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-peek-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void shouldObserveStreamElements() {
@@ -49,11 +74,11 @@ public class KStreamPeekTest {
         final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
         final List<KeyValue<Integer, String>> expected = new ArrayList<>();
         for (int key = 0; key < 32; key++) {
             final String value = "V" + key;
-            driver.process(topicName, key, value);
+            driver.pipeInput(recordFactory.create(topicName, key, value));
             expected.add(new KeyValue<>(key, value));
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index f4f340f..0bf6452 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -16,20 +16,27 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
@@ -39,8 +46,27 @@ public class KStreamSelectKeyTest {
 
     final private Serde<Integer> integerSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-select-key-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testSelectKey() {
@@ -68,10 +94,10 @@ public class KStreamSelectKeyTest {
 
         stream.selectKey(selector).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
 
         for (int expectedValue : expectedValues) {
-            driver.process(topicName, null, expectedValue);
+            driver.pipeInput(recordFactory.create(expectedValue));
         }
 
         assertEquals(3, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index df3ceaf..aa0cf7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -16,20 +16,31 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.junit.Assert.assertEquals;
 
 public class KStreamTransformTest {
@@ -37,8 +48,30 @@ public class KStreamTransformTest {
     private String topicName = "topic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
+    private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
     @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testTransform() {
@@ -79,13 +112,77 @@ public class KStreamTransformTest {
         KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
         stream.transform(transformerSupplier).process(processor);
 
-        driver.setUp(builder);
+        kstreamDriver.setUp(builder);
+        for (int expectedKey : expectedKeys) {
+            kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
+        }
+
+        kstreamDriver.punctuate(2);
+        kstreamDriver.punctuate(3);
+
+        assertEquals(6, processor.processed.size());
+
+        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processor.processed.get(i));
+        }
+    }
+
+    @Test
+    public void testTransformWithNewDriverAndPunctuator() {
+        StreamsBuilder builder = new StreamsBuilder();
+
+        TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
+            new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+                public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+                    return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
+
+                        private int total = 0;
+
+                        @Override
+                        public void init(final ProcessorContext context) {
+                            context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+                                @Override
+                                public void punctuate(long timestamp) {
+                                    context.forward(-1, (int) timestamp);
+                                }
+                            });
+                        }
+
+                        @Override
+                        public KeyValue<Integer, Integer> transform(Number key, Number value) {
+                            total += value.intValue();
+                            return KeyValue.pair(key.intValue() * 2, total);
+                        }
+
+                        @Override
+                        public KeyValue<Integer, Integer> punctuate(long timestamp) {
+                            return null;
+                        }
+
+                        @Override
+                        public void close() {
+                        }
+                    };
+                }
+            };
+
+        final int[] expectedKeys = {1, 10, 100, 1000};
+
+        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        stream.transform(transformerSupplier).process(processor);
+
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, expectedKey * 10);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
         }
 
-        driver.punctuate(2);
-        driver.punctuate(3);
+        // This tick will yield yields the "-1:2" result
+        driver.advanceWallClockTime(2);
+        // This tick further advances the clock to 3, which leads to the "-1:3" result
+        driver.advanceWallClockTime(1);
 
         assertEquals(6, processor.processed.size());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index dc0b886..59a6a21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -29,11 +32,15 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 
@@ -42,8 +49,27 @@ public class KStreamTransformValuesTest {
     private String topicName = "topic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
+
+    @Before
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testTransform() {
@@ -85,9 +111,10 @@ public class KStreamTransformValuesTest {
         stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
         stream.transformValues(valueTransformerSupplier).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
+
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, expectedKey * 10);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
         }
         String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
 
@@ -128,9 +155,10 @@ public class KStreamTransformValuesTest {
         stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
         stream.transformValues(valueTransformerSupplier).process(processor);
 
-        driver.setUp(builder);
+        driver = new TopologyTestDriver(builder.build(), props);
+
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, expectedKey * 10);
+            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
         }
         String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 7082251..fc31db9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -18,10 +18,13 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -29,20 +32,18 @@ import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
+import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -52,13 +53,26 @@ import static org.junit.Assert.assertEquals;
 public class KStreamWindowAggregateTest {
 
     final private Serde<String> strSerde = Serdes.String();
-    private File stateDir = null;
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
 
     @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-window-aggregate-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test
@@ -74,55 +88,24 @@ public class KStreamWindowAggregateTest {
         final MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
 
-        driver.setUp(builder, stateDir);
-
-        setRecordContext(0, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        setRecordContext(1, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(2, topic1);
-        driver.process(topic1, "C", "3");
-        driver.flushState();
-        setRecordContext(3, topic1);
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        setRecordContext(4, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-
-        setRecordContext(5, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        setRecordContext(6, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(7, topic1);
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        setRecordContext(8, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(9, topic1);
-        driver.process(topic1, "C", "3");
-        driver.flushState();
-        setRecordContext(10, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        setRecordContext(11, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(12, topic1);
-        driver.flushState();
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        setRecordContext(13, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(14, topic1);
-        driver.process(topic1, "C", "3");
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+        driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+        driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+        driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+        driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
+        driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
+        driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
 
 
         assertEquals(
@@ -149,10 +132,6 @@ public class KStreamWindowAggregateTest {
         );
     }
 
-    private void setRecordContext(final long time, @SuppressWarnings("SameParameterValue") final String topic) {
-        ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
-    }
-
     @Test
     public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -183,23 +162,13 @@ public class KStreamWindowAggregateTest {
             }
         }).toStream().process(proc3);
 
-        driver.setUp(builder, stateDir);
-
-        setRecordContext(0, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        setRecordContext(1, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(2, topic1);
-        driver.process(topic1, "C", "3");
-        driver.flushState();
-        setRecordContext(3, topic1);
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        setRecordContext(4, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+        driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+        driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
 
         proc1.checkAndClearProcessResult(
             "[A@0/10]:0+1",
@@ -211,21 +180,11 @@ public class KStreamWindowAggregateTest {
         proc2.checkAndClearProcessResult();
         proc3.checkAndClearProcessResult();
 
-        setRecordContext(5, topic1);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        setRecordContext(6, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(7, topic1);
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        setRecordContext(8, topic1);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        setRecordContext(9, topic1);
-        driver.process(topic1, "C", "3");
-        driver.flushState();
+        driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+        driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+        driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+        driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
 
         proc1.checkAndClearProcessResult(
             "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
@@ -237,21 +196,11 @@ public class KStreamWindowAggregateTest {
         proc2.checkAndClearProcessResult();
         proc3.checkAndClearProcessResult();
 
-        setRecordContext(0, topic1);
-        driver.process(topic2, "A", "a");
-        driver.flushState();
-        setRecordContext(1, topic1);
-        driver.process(topic2, "B", "b");
-        driver.flushState();
-        setRecordContext(2, topic1);
-        driver.process(topic2, "C", "c");
-        driver.flushState();
-        setRecordContext(3, topic1);
-        driver.process(topic2, "D", "d");
-        driver.flushState();
-        setRecordContext(4, topic1);
-        driver.process(topic2, "A", "a");
-        driver.flushState();
+        driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
+        driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
+        driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
+        driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
+        driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
 
         proc1.checkAndClearProcessResult();
         proc2.checkAndClearProcessResult(
@@ -268,21 +217,12 @@ public class KStreamWindowAggregateTest {
             "[D@0/10]:0+4+4%0+d",
             "[A@0/10]:0+1+1+1%0+a+a");
 
-        setRecordContext(5, topic1);
-        driver.process(topic2, "A", "a");
-        driver.flushState();
-        setRecordContext(6, topic1);
-        driver.process(topic2, "B", "b");
-        driver.flushState();
-        setRecordContext(7, topic1);
-        driver.process(topic2, "D", "d");
-        driver.flushState();
-        setRecordContext(8, topic1);
-        driver.process(topic2, "B", "b");
-        driver.flushState();
-        setRecordContext(9, topic1);
-        driver.process(topic2, "C", "c");
-        driver.flushState();
+        driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
+        driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
+        driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
+        driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
+        driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
+
         proc1.checkAndClearProcessResult();
         proc2.checkAndClearProcessResult(
             "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
@@ -314,15 +254,13 @@ public class KStreamWindowAggregateTest {
                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
             );
 
-        driver.setUp(builder, stateDir);
+        driver = new TopologyTestDriver(builder.build(), props, 0L);
 
-        setRecordContext(0, topic);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.process(topic, null, "1");
-        driver.flushState();
+        driver.pipeInput(recordFactory.create(topic, null, "1"));
         LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-1] offset=[-1]"));
+        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index d8b3a5f..30c0a7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -16,27 +16,31 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
@@ -44,15 +48,28 @@ import static org.junit.Assert.assertEquals;
 public class KTableForeachTest {
 
     final private String topicName = "topic";
-    private File stateDir = null;
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private TopologyTestDriver driver;
+    private final Properties props = new Properties();
 
     @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
+    public void setup() {
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-foreach-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+    }
+
+    @After
+    public void cleanup() {
+        props.clear();
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
     }
 
     @Test
@@ -91,11 +108,11 @@ public class KTableForeachTest {
         table.foreach(action);
 
         // Then
-        driver.setUp(builder, stateDir);
+        driver = new TopologyTestDriver(builder.build(), props);
+
         for (KeyValue<Integer, String> record: inputRecords) {
-            driver.process(topicName, record.key, record.value);
+            driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
         }
-        driver.flushState();
 
         assertEquals(expectedRecords.size(), actualRecords.size());
         for (int i = 0; i < expectedRecords.size(); i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f67b634..f948c2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
@@ -34,7 +36,6 @@ import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -605,16 +606,12 @@ public class TopologyBuilderTest {
 
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(sourceNodeName, "topic")
-                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
-                        sourceNodeName)
-                .addStateStore(
-                        Stores.create(LocalMockProcessorSupplier.STORE_NAME)
-                                .withStringKeys().withStringValues().inMemory()
-                                .build(), goodNodeName)
-                .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
-                        sourceNodeName);     
+                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+                .addStateStore(Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), goodNodeName)
+                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
         try {
-            final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
+            final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
+            driver.pipeInput(new ConsumerRecord<>("topic", 0, 0L, new byte[] {}, new byte[] {}));
             fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
             final String error = e.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 901fc4b..25468c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
@@ -34,7 +35,6 @@ import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -581,7 +581,7 @@ public class InternalTopologyBuilderTest {
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
         
         try {
-            new ProcessorTopologyTestDriver(streamsConfig, builder);
+            new TopologyTestDriverWrapper(builder, config);
             fail("Should have throw StreamsException");
         } catch (final StreamsException e) {
             final String error = e.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index d07274a..a80b25d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -36,8 +37,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -66,26 +67,26 @@ public class ProcessorTopologyTest {
 
     private final TopologyBuilder builder = new TopologyBuilder();
     private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
 
-    private ProcessorTopologyTestDriver driver;
-    private StreamsConfig config;
+    private TopologyTestDriverWrapper driver;
+    private final Properties props = new Properties();
 
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
         final File localState = TestUtils.tempDirectory();
-        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
         props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
-        this.config = new StreamsConfig(props);
     }
 
     @After
     public void cleanup() {
+        props.clear();
         if (driver != null) {
             driver.close();
         }
@@ -122,19 +123,19 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingSimpleTopology() {
-        final int partition = 10;
-        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        int partition = 10;
+        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
         assertNoOutputRecord(OUTPUT_TOPIC_2);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition);
@@ -144,18 +145,18 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingMultiplexingTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver = new TopologyTestDriverWrapper(createMultiplexingTopology().internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
 
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
 
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -166,18 +167,18 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingMultiplexByNameTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology().internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
 
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
 
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -188,12 +189,12 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingStatefulTopology() {
-        final String storeName = "entries";
-        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        String storeName = "entries";
+        driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName).internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value4"));
         assertNoOutputRecord(OUTPUT_TOPIC_1);
 
         final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName);
@@ -214,10 +215,10 @@ public class ProcessorTopologyTest {
         final TopologyBuilder topologyBuilder = this.builder
                 .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
 
-        driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder);
-        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get(storeName);
-        driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver = new TopologyTestDriverWrapper(topologyBuilder.internalTopologyBuilder, props);
+        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
+        driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
+        driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
         assertEquals("value1", globalStore.get("key1"));
         assertEquals("value2", globalStore.get("key2"));
     }
@@ -225,23 +226,23 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         final int partition = 10;
-        driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder);
+        driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition).internalTopologyBuilder, props);
 
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC_2, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_2, "key2", "value2"));
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_1);
     }
 
     @Test
     public void testDrivingForwardToSourceTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver = new TopologyTestDriverWrapper(createForwardToSourceTopology().internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3");
@@ -249,10 +250,10 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingInternalRepartitioningTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology().internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
@@ -260,10 +261,10 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000"));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000"));
         assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER),
                 equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 1000L, "key1", "value1")));
         assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER),
@@ -319,10 +320,10 @@ public class ProcessorTopologyTest {
     @Test
     public void shouldConsiderTimeStamps() {
         final int partition = 10;
-        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+        driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L);
@@ -331,10 +332,10 @@ public class ProcessorTopologyTest {
     @Test
     public void shouldConsiderModifiedTimeStamps() {
         final int partition = 10;
-        driver = new ProcessorTopologyTestDriver(config, createTimestampTopology(partition).internalTopologyBuilder);
-        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
-        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
-        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+        driver = new TopologyTestDriverWrapper(createTimestampTopology(partition).internalTopologyBuilder, props);
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 20L);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 30L);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 3daf051..c93a306 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+@Deprecated
 public class KStreamTestDriver extends ExternalResource {
 
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
deleted file mode 100644
index afd2bb2..0000000
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
-import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.StateDirectory;
-import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
-import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class makes it easier to write tests to verify the behavior of topologies created with a {@link Topology}.
- * You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors,
- * and sinks. And because it starts with a {@link Topology}, you can create topologies specific to your tests or you
- * can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real
- * Kafka broker, so the tests execute very quickly with very little overhead.
- * <p>
- * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamsConfig} and a
- * TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any
- * messages output by the topology.
- * <p>
- * Although the driver doesn't use a real Kafka broker, it does simulate Kafka {@link org.apache.kafka.clients.consumer.Consumer}s
- * and {@link org.apache.kafka.clients.producer.Producer}s that read and write raw {@code byte[]} messages. You can either deal
- * with messages that have {@code byte[]} keys and values, or you can supply the {@link Serializer}s and {@link Deserializer}s
- * that the driver can use to convert the keys and values into objects.
- *
- * <h2>Driver setup</h2>
- * <p>
- * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamsConfig}. The
- * configuration needs to be representative of what you'd supply to the real topology, so that means including several key
- * properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
- * (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
- *
- * <pre>
- * StringSerializer strSerializer = new StringSerializer();
- * StringDeserializer strDeserializer = new StringDeserializer();
- * Properties props = new Properties();
- * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * StreamsConfig config = new StreamsConfig(props);
- * TopologyBuilder builder = ...
- * ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
- * </pre>
- *
- * <h2>Processing messages</h2>
- * <p>
- * Your test can supply new input records on any of the topics that the topology's sources consume. Here's an example of an
- * input message on the topic named {@code input-topic}:
- *
- * <pre>
- * driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
- * </pre>
- *
- * Immediately, the driver will pass the input message through to the appropriate source that consumes the named topic,
- * and will invoke the processor(s) downstream of the source. If your topology's processors forward messages to sinks,
- * your test can then consume these output messages to verify they match the expected outcome. For example, if our topology
- * should have generated 2 messages on {@code output-topic-1} and 1 message on {@code output-topic-2}, then our test can
- * obtain these messages using the {@link #readOutput(String, Deserializer, Deserializer)} method:
- *
- * <pre>
- * ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
- * ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
- * ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
- * </pre>
- *
- * Again, our example topology generates messages with string keys and values, so we supply our string deserializer instance
- * for use on both the keys and values. Your test logic can then verify whether these output records are correct.
- * <p>
- * Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
- * {@link org.apache.kafka.streams.processor.Processor}s.
- *
- * <h2>Processor state</h2>
- * <p>
- * Some processors use Kafka {@link StateStore state storage}, so this driver class provides the {@link #getStateStore(String)}
- * and {@link #getKeyValueStore(String)} methods so that your tests can check the underlying state store(s) used by your
- * topology's processors. In our previous example, after we supplied a single input message and checked the three output messages,
- * our test could also check the key value store to verify the processor correctly added, removed, or updated internal state.
- * Or, our test might have pre-populated some state <em>before</em> submitting the input message, and verified afterward that the
- * processor(s) correctly updated the state.
- */
-public class ProcessorTopologyTestDriver {
-
-    private final static String APPLICATION_ID = "test-driver-application";
-    private final static int PARTITION_ID = 0;
-    private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
-
-    private final ProcessorTopology topology;
-    private final MockProducer<byte[], byte[]> producer;
-    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
-    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
-    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
-    private final Set<String> internalTopics = new HashSet<>();
-    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
-    private StreamTask task;
-    private GlobalStateUpdateTask globalStateTask;
-
-    /**
-     * Create a new test driver instance.
-     *
-     * @param config  the stream configuration for the topology
-     * @param builder the topology builder that will be used to create the topology instance
-     */
-    public ProcessorTopologyTestDriver(final StreamsConfig config,
-                                       final InternalTopologyBuilder builder) {
-        topology = builder.setApplicationId(APPLICATION_ID).build(null);
-        final ProcessorTopology globalTopology = builder.buildGlobalStateTopology();
-
-        // Set up the consumer and producer ...
-        final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-        producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
-            }
-        };
-
-        // Identify internal topics for forwarding in process ...
-        for (final InternalTopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
-            internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
-        }
-
-        // Set up all of the topic+partition information and subscribe the consumer to each ...
-        for (final String topic : topology.sourceTopics()) {
-            final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
-            partitionsByTopic.put(topic, tp);
-            offsetsByTopicPartition.put(tp, new AtomicLong());
-        }
-
-        consumer.assign(offsetsByTopicPartition.keySet());
-
-        final StateDirectory stateDirectory = new StateDirectory(config, Time.SYSTEM);
-        final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics);
-
-        if (globalTopology != null) {
-            final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
-            final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-            for (final String topicName : globalTopology.sourceTopics()) {
-                final List<PartitionInfo> partitionInfos = new ArrayList<>();
-                partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
-                globalConsumer.updatePartitions(topicName, partitionInfos);
-                final TopicPartition partition = new TopicPartition(topicName, 1);
-                globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
-                globalPartitionsByTopic.put(topicName, partition);
-                offsetsByTopicPartition.put(partition, new AtomicLong());
-            }
-            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(
-                new LogContext("mock "),
-                globalTopology,
-                globalConsumer,
-                stateDirectory,
-                stateRestoreListener,
-                config);
-            final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache);
-            stateManager.setGlobalProcessorContext(globalProcessorContext);
-            globalStateTask = new GlobalStateUpdateTask(
-                globalTopology,
-                globalProcessorContext,
-                stateManager,
-                new LogAndContinueExceptionHandler(),
-                new LogContext());
-            globalStateTask.initialize();
-        }
-
-        if (!partitionsByTopic.isEmpty()) {
-            task = new StreamTask(
-                TASK_ID,
-                partitionsByTopic.values(),
-                topology,
-                consumer,
-                new StoreChangelogReader(
-                    createRestoreConsumer(topology.storeToChangelogTopic()),
-                    new MockStateRestoreListener(),
-                    new LogContext("topology-test-driver ")
-                ),
-                config,
-                streamsMetrics,
-                stateDirectory,
-                cache,
-                new MockTime(),
-                producer
-            );
-            task.initializeStateStores();
-            task.initializeTopology();
-        }
-    }
-
-    /**
-     * Send an input message with the given key, value and timestamp on the specified topic to the topology, and then commit the messages.
-     *
-     * @param topicName the name of the topic on which the message is to be sent
-     * @param key       the raw message key
-     * @param value     the raw message value
-     * @param timestamp the raw message timestamp
-     */
-    public void process(final String topicName,
-                        final byte[] key,
-                        final byte[] value,
-                        final long timestamp) {
-
-        final TopicPartition tp = partitionsByTopic.get(topicName);
-        if (tp != null) {
-            // Add the record ...
-            final long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
-            task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
-            producer.clear();
-
-            // Process the record ...
-            task.process();
-            ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(timestamp, offset, tp.partition(), topicName));
-            task.commit();
-
-            // Capture all the records sent to the producer ...
-            for (final ProducerRecord<byte[], byte[]> record : producer.history()) {
-                Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
-                if (outputRecords == null) {
-                    outputRecords = new LinkedList<>();
-                    outputRecordsByTopic.put(record.topic(), outputRecords);
-                }
-                outputRecords.add(record);
-
-                // Forward back into the topology if the produced record is to an internal or a source topic ...
-                if (internalTopics.contains(record.topic()) || topology.sourceTopics().contains(record.topic())) {
-                    process(record.topic(), record.key(), record.value(), record.timestamp());
-                }
-            }
-        } else {
-            final TopicPartition global = globalPartitionsByTopic.get(topicName);
-            if (global == null) {
-                throw new IllegalArgumentException("Unexpected topic: " + topicName);
-            }
-            final long offset = offsetsByTopicPartition.get(global).incrementAndGet();
-            globalStateTask.update(new ConsumerRecord<>(global.topic(), global.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value));
-            globalStateTask.flushState();
-        }
-    }
-
-    /**
-     * Send an input message with the given key and value on the specified topic to the topology.
-     *
-     * @param topicName the name of the topic on which the message is to be sent
-     * @param key       the raw message key
-     * @param value     the raw message value
-     */
-    public void process(final String topicName,
-                        final byte[] key,
-                        final byte[] value) {
-        process(topicName, key, value, 0L);
-    }
-
-    /**
-     * Send an input message with the given key and value on the specified topic to the topology.
-     *
-     * @param topicName       the name of the topic on which the message is to be sent
-     * @param key             the raw message key
-     * @param value           the raw message value
-     * @param keySerializer   the serializer for the key
-     * @param valueSerializer the serializer for the value
-     */
-    public <K, V> void process(final String topicName,
-                               final K key,
-                               final V value,
-                               final Serializer<K> keySerializer,
-                               final Serializer<V> valueSerializer) {
-        process(topicName, key, value, keySerializer, valueSerializer, 0L);
-    }
-
-    /**
-     * Send an input message with the given key and value and timestamp on the specified topic to the topology.
-     *
-     * @param topicName       the name of the topic on which the message is to be sent
-     * @param key             the raw message key
-     * @param value           the raw message value
-     * @param keySerializer   the serializer for the key
-     * @param valueSerializer the serializer for the value
-     * @param timestamp       the raw message timestamp
-     */
-    public <K, V> void process(final String topicName,
-                               final K key,
-                               final V value,
-                               final Serializer<K> keySerializer,
-                               final Serializer<V> valueSerializer,
-                               final long timestamp) {
-        process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value), timestamp);
-    }
-
-    /**
-     * Read the next record from the given topic. These records were output by the topology during the previous calls to
-     * {@link #process(String, byte[], byte[])}.
-     *
-     * @param topic the name of the topic
-     * @return the next record on that topic, or null if there is no record available
-     */
-    public ProducerRecord<byte[], byte[]> readOutput(final String topic) {
-        final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
-        if (outputRecords == null) {
-            return null;
-        }
-        return outputRecords.poll();
-    }
-
-    /**
-     * Read the next record from the given topic. These records were output by the topology during the previous calls to
-     * {@link #process(String, byte[], byte[])}.
-     *
-     * @param topic             the name of the topic
-     * @param keyDeserializer   the deserializer for the key type
-     * @param valueDeserializer the deserializer for the value type
-     * @return the next record on that topic, or null if there is no record available
-     */
-    public <K, V> ProducerRecord<K, V> readOutput(final String topic,
-                                                  final Deserializer<K> keyDeserializer,
-                                                  final Deserializer<V> valueDeserializer) {
-        final ProducerRecord<byte[], byte[]> record = readOutput(topic);
-        if (record == null) {
-            return null;
-        }
-        final K key = keyDeserializer.deserialize(record.topic(), record.key());
-        final V value = valueDeserializer.deserialize(record.topic(), record.value());
-        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
-    }
-
-    private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]> record) {
-        return Collections.singleton(record);
-    }
-
-    /**
-     * Get the {@link StateStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is
-     * presumed to be used by a Processor within the topology.
-     * <p>
-     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
-     * {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
-     *
-     * @param name the name of the store
-     * @return the state store, or null if no store has been registered with the given name
-     * @see #getKeyValueStore(String)
-     */
-    public StateStore getStateStore(final String name) {
-        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
-    }
-
-    /**
-     * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
-     * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is
-     * presumed to be used by a Processor within the topology.
-     * <p>
-     * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
-     * {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
-     * <p>
-     *
-     * @param name the name of the store
-     * @return the key value store, or null if no {@link KeyValueStore} has been registered with the given name
-     * @see #getStateStore(String)
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
-        final StateStore store = getStateStore(name);
-        return store instanceof KeyValueStore ? (KeyValueStore<K, V>) getStateStore(name) : null;
-    }
-
-    /**
-     * Close the driver, its topology, and all processors.
-     */
-    public void close() {
-        if (task != null) {
-            task.close(true, false);
-        }
-        if (globalStateTask != null) {
-            try {
-                globalStateTask.close();
-            } catch (final IOException e) {
-                // ignore
-            }
-        }
-    }
-
-    /**
-     * Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this
-     * driver object unless this method is overwritten with a functional consumer.
-     *
-     * @param storeToChangelogTopic the map of the names of the stores to the changelog topics
-     * @return the mock consumer; never null
-     */
-    private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
-            @Override
-            public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
-
-            @Override
-            public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
-
-            @Override
-            public synchronized long position(final TopicPartition partition) {
-                return 0L;
-            }
-        };
-        // For each store ...
-        for (final Map.Entry<String, String> storeAndTopic : storeToChangelogTopic.entrySet()) {
-            final String topicName = storeAndTopic.getValue();
-            // Set up the restore-state topic ...
-            // consumer.subscribe(new TopicPartition(topicName, 1));
-            // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
-            final List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
-            consumer.updatePartitions(topicName, partitionInfos);
-            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
-        }
-        return consumer;
-    }
-
-    private MockConsumer<byte[], byte[]> createGlobalConsumer() {
-        return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
-            @Override
-            public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
-
-            @Override
-            public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
-
-            @Override
-            public synchronized long position(final TopicPartition partition) {
-                return 0L;
-            }
-        };
-    }
-
-}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index bde70b4..7343d59 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -216,10 +216,36 @@ public class TopologyTestDriver implements Closeable {
     public TopologyTestDriver(final Topology topology,
                               final Properties config,
                               final long initialWallClockTimeMs) {
+
+        this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
+    }
+
+    /**
+     * Create a new test diver instance.
+     *
+     * @param builder builder for the topology to be tested
+     * @param config the configuration for the topology
+     */
+    protected TopologyTestDriver(final InternalTopologyBuilder builder,
+                              final Properties config) {
+        this(builder, config,  System.currentTimeMillis());
+
+    }
+
+    /**
+     * Create a new test diver instance.
+     *
+     * @param builder builder for the topology to be tested
+     * @param config the configuration for the topology
+     * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
+     */
+    private TopologyTestDriver(final InternalTopologyBuilder builder,
+                              final Properties config,
+                              final long initialWallClockTimeMs) {
         final StreamsConfig streamsConfig = new StreamsConfig(config);
         mockTime = new MockTime(initialWallClockTimeMs);
 
-        internalTopologyBuilder = topology.internalTopologyBuilder;
+        internalTopologyBuilder = builder;
         internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
 
         processorTopology = internalTopologyBuilder.build(null);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.