You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/13 17:28:17 UTC

[kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] (#4986)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de4f4f5  KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] (#4986)
de4f4f5 is described below

commit de4f4f530a92473edb3665c94ac386d5c53cc893
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Wed Jun 13 18:27:35 2018 +0100

    KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] (#4986)
    
    * KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2]
    
    * Refactor:
      -KTableFilterTest.java
      -KTableImplTest.java
      -KTableMapValuesTest.java
      -KTableSourceTest.java
    
    * Add access to task, processorTopology, and globalTopology in TopologyTestDriver via TopologyTestDriverWrapper
    * Remove unnecessary constructor in TopologyTestDriver
    
    * Change how TopologyTestDriverWrapper#getProcessorContext sets the current node
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/TopologyTestDriverWrapper.java   |  70 +++++
 .../kstream/internals/KTableFilterTest.java        | 249 +++++++++--------
 .../streams/kstream/internals/KTableImplTest.java  | 256 ++++++++---------
 .../kstream/internals/KTableMapValuesTest.java     | 308 ++++++++++-----------
 .../kstream/internals/KTableSourceTest.java        | 190 +++++++------
 .../apache/kafka/streams/TopologyTestDriver.java   |   7 +-
 6 files changed, 576 insertions(+), 504 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
new file mode 100644
index 0000000..bec4b5f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Properties;
+
+/**
+ * This class provides access to {@link TopologyTestDriver} protected methods.
+ * It should only be used for internal testing, in the rare occasions where the
+ * necessary functionality is not supported by {@link TopologyTestDriver}.
+ */
+public class TopologyTestDriverWrapper extends TopologyTestDriver {
+
+
+    public TopologyTestDriverWrapper(final Topology topology,
+                                     final Properties config) {
+        super(topology, config);
+    }
+
+    /**
+     * Get the processor context, setting the processor whose name is given as current node
+     *
+     * @param processorName processor name to set as current node
+     * @return the processor context
+     */
+    public ProcessorContext setCurrentNodeForProcessorContext(final String processorName) {
+        final ProcessorContext context = task.context();
+        ((ProcessorContextImpl) context).setCurrentNode(getProcessor(processorName));
+        return context;
+    }
+
+    /**
+     * Get a processor by name
+     *
+     * @param name the name to search for
+     * @return the processor matching the search name
+     */
+    public ProcessorNode getProcessor(final String name) {
+        for (final ProcessorNode node : processorTopology.processors()) {
+            if (node.name().equals(name)) {
+                return node;
+            }
+        }
+        for (final ProcessorNode node : globalTopology.processors()) {
+            if (node.name().equals(name)) {
+                return node;
+            }
+        }
+        throw new StreamsException("Could not find a processor named '" + name + "'");
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c37078d..2cf192b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -16,45 +16,40 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.List;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class KTableFilterTest {
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde);
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
+    private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
 
     private void doTestKTable(final StreamsBuilder builder,
                               final KTable<String, Integer> table2,
@@ -64,16 +59,14 @@ public class KTableFilterTest {
         table2.toStream().process(supplier);
         table3.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
-
-        driver.process(topic, "A", 1);
-        driver.process(topic, "B", 2);
-        driver.process(topic, "C", 3);
-        driver.process(topic, "D", 4);
-        driver.flushState();
-        driver.process(topic, "A", null);
-        driver.process(topic, "B", null);
-        driver.flushState();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, "A", 1));
+            driver.pipeInput(recordFactory.create(topic, "B", 2));
+            driver.pipeInput(recordFactory.create(topic, "C", 3));
+            driver.pipeInput(recordFactory.create(topic, "D", 4));
+            driver.pipeInput(recordFactory.create(topic, "A", null));
+            driver.pipeInput(recordFactory.create(topic, "B", null));
+        }
 
         final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
 
@@ -136,63 +129,68 @@ public class KTableFilterTest {
                                    final KTableImpl<String, Integer, Integer> table2,
                                    final KTableImpl<String, Integer, Integer> table3,
                                    final String topic1) {
+
+        final Topology topology = builder.build();
+
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+        final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
 
-        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
 
-        getter2.init(driver.context());
-        getter3.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 1);
-        driver.process(topic1, "C", 1);
+            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
 
-        assertNull(getter2.get("A"));
-        assertNull(getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", 1));
+            driver.pipeInput(recordFactory.create(topic1, "B", 1));
+            driver.pipeInput(recordFactory.create(topic1, "C", 1));
 
-        assertEquals(1, (int) getter3.get("A"));
-        assertEquals(1, (int) getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
 
-        driver.process(topic1, "A", 2);
-        driver.process(topic1, "B", 2);
-        driver.flushState();
+            assertEquals(1, (int) getter3.get("A"));
+            assertEquals(1, (int) getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
 
-        assertEquals(2, (int) getter2.get("A"));
-        assertEquals(2, (int) getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", 2));
+            driver.pipeInput(recordFactory.create(topic1, "B", 2));
 
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertEquals(2, (int) getter2.get("A"));
+            assertEquals(2, (int) getter2.get("B"));
+            assertNull(getter2.get("C"));
 
-        driver.process(topic1, "A", 3);
-        driver.flushState();
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
 
-        assertNull(getter2.get("A"));
-        assertEquals(2, (int) getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", 3));
 
-        assertEquals(3, (int) getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertEquals(2, (int) getter2.get("B"));
+            assertNull(getter2.get("C"));
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+            assertEquals(3, (int) getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
 
-        assertNull(getter2.get("A"));
-        assertNull(getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", null));
+            driver.pipeInput(recordFactory.create(topic1, "B", null));
 
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
+        }
     }
 
     @Test
@@ -259,34 +257,34 @@ public class KTableFilterTest {
         builder.build().addProcessor("proc1", supplier, table1.name);
         builder.build().addProcessor("proc2", supplier, table2.name);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 1);
-        driver.process(topic1, "C", 1);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", 1));
+            driver.pipeInput(recordFactory.create(topic1, "B", 1));
+            driver.pipeInput(recordFactory.create(topic1, "C", 1));
 
-        final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+            final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+            processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
-        processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
-
-        driver.process(topic1, "A", 2);
-        driver.process(topic1, "B", 2);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-
-        driver.process(topic1, "A", 3);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(3<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-null)");
-
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+            driver.pipeInput(recordFactory.create(topic1, "A", 2));
+            driver.pipeInput(recordFactory.create(topic1, "B", 2));
+
+            processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", 3));
+
+            processors.get(0).checkAndClearProcessResult("A:(3<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-null)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", null));
+            driver.pipeInput(recordFactory.create(topic1, "B", null));
+
+            processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        }
     }
 
 
@@ -340,34 +338,34 @@ public class KTableFilterTest {
         topology.addProcessor("proc1", supplier, table1.name);
         topology.addProcessor("proc2", supplier, table2.name);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 1);
-        driver.process(topic1, "C", 1);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", 1));
+            driver.pipeInput(recordFactory.create(topic1, "B", 1));
+            driver.pipeInput(recordFactory.create(topic1, "C", 1));
 
-        final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+            final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
 
-        processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-        processors.get(1).checkEmptyAndClearProcessResult();
+            processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            processors.get(1).checkEmptyAndClearProcessResult();
+
+            driver.pipeInput(recordFactory.create(topic1, "A", 2));
+            driver.pipeInput(recordFactory.create(topic1, "B", 2));
+
+            processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+            processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", 3));
+
+            processors.get(0).checkAndClearProcessResult("A:(3<-2)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-2)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", null));
+            driver.pipeInput(recordFactory.create(topic1, "B", null));
 
-        driver.process(topic1, "A", 2);
-        driver.process(topic1, "B", 2);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
-        processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-
-        driver.process(topic1, "A", 3);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(3<-2)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-2)");
-
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
-        processors.get(1).checkAndClearProcessResult("B:(null<-2)");
+            processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+            processors.get(1).checkAndClearProcessResult("B:(null<-2)");
+        }
     }
 
     @Test
@@ -418,12 +416,13 @@ public class KTableFilterTest {
         topology.addProcessor("proc1", supplier, table1.name);
         topology.addProcessor("proc2", supplier, table2.name);
 
-        driver.setUp(builder, stateDir, stringSerde, stringSerde);
+        final ConsumerRecordFactory<String, String> stringRecordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
 
-        driver.process(topic1, "A", "reject");
-        driver.process(topic1, "B", "reject");
-        driver.process(topic1, "C", "reject");
-        driver.flushState();
+            driver.pipeInput(stringRecordFactory.create(topic1, "A", "reject"));
+            driver.pipeInput(stringRecordFactory.create(topic1, "B", "reject"));
+            driver.pipeInput(stringRecordFactory.create(topic1, "C", "reject"));
+        }
 
         final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2);
         processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
@@ -437,7 +436,7 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
         KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
@@ -459,7 +458,7 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
         KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 0b9c1ab..016cde2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -16,12 +16,17 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyDescription;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -30,10 +35,11 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
@@ -41,36 +47,31 @@ import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.lang.reflect.Field;
 import java.util.List;
+import java.util.Properties;
 
 import static org.easymock.EasyMock.mock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class KTableImplTest {
 
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-    private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
+    private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+    private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
     private StreamsBuilder builder;
     private KTable<String, String> table;
 
     @Before
     public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
         builder = new StreamsBuilder();
         table = builder.table("test");
     }
@@ -110,17 +111,12 @@ public class KTableImplTest {
 
         table4.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir);
-
-        driver.process(topic1, "A", "01");
-        driver.flushState();
-        driver.process(topic1, "B", "02");
-        driver.flushState();
-        driver.process(topic1, "C", "03");
-        driver.flushState();
-        driver.process(topic1, "D", "04");
-        driver.flushState();
-        driver.flushState();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "03"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "04"));
+        }
 
         final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
         assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(0).processed);
@@ -156,104 +152,109 @@ public class KTableImplTest {
         table1.toStream().to(topic2, produced);
         final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
+        final Topology topology = builder.build();
+
         final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
         final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir, null, null);
+        final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
+
+        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
 
-        // two state store should be created
-        assertEquals(2, driver.allStateStores().size());
+            assertEquals(2, driver.getAllStateStores().size());
 
-        final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-        getter1.init(driver.context());
-        final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-        getter2.init(driver.context());
-        final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-        getter3.init(driver.context());
-        final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-        getter4.init(driver.context());
+            final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
+            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
+            getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
 
-        assertEquals("01", getter1.get("A"));
-        assertEquals("01", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        assertEquals(new Integer(1), getter2.get("A"));
-        assertEquals(new Integer(1), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertEquals("01", getter4.get("A"));
-        assertEquals("01", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
 
-        assertEquals("02", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        assertEquals(new Integer(2), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertEquals(new Integer(2), getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertEquals("02", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
 
-        assertEquals("03", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        assertEquals(new Integer(3), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertEquals("03", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
 
-        driver.process(topic1, "A", null);
-        driver.flushState();
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
 
-        assertNull(getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertNull(getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
 
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertNull(getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+        }
     }
 
     @Test
@@ -282,11 +283,9 @@ public class KTableImplTest {
                     }
                 });
 
-        driver.setUp(builder, stateDir, null, null);
-        driver.setTime(0L);
-
-        // two state stores should be created
-        assertEquals(2, driver.allStateStores().size());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertEquals(2, driver.getAllStateStores().size());
+        }
     }
 
     @Test
@@ -323,15 +322,25 @@ public class KTableImplTest {
                     }
                 });
 
-        driver.setUp(builder, stateDir, null, null);
-        driver.setTime(0L);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertEquals(2, driver.getAllStateStores().size());
+        }
+    }
 
-        // two state store should be created
-        assertEquals(2, driver.allStateStores().size());
+    private void assertTopologyContainsProcessor(final Topology topology, final String processorName) {
+        for (final TopologyDescription.Subtopology subtopology: topology.describe().subtopologies()) {
+            for (final TopologyDescription.Node node: subtopology.nodes()) {
+                if (node.name().equals(processorName)) {
+                    return;
+                }
+            }
+        }
+        throw new AssertionError("No processor named '" + processorName + "'"
+                + "found in the provided Topology:\n" + topology.describe());
     }
 
     @Test
-    public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
+    public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws NoSuchFieldException, IllegalAccessException {
         final String topic1 = "topic1";
         final String storeName1 = "storeName1";
 
@@ -341,8 +350,8 @@ public class KTableImplTest {
                 (KTableImpl<String, String, String>) builder.table(topic1,
                                                                    consumed,
                                                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
-                                                                           .withKeySerde(stringSerde)
-                                                                           .withValueSerde(stringSerde)
+                                                                           .withKeySerde(Serdes.String())
+                                                                           .withValueSerde(Serdes.String())
                 );
 
         table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
@@ -352,27 +361,26 @@ public class KTableImplTest {
         table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result2"));
 
-        driver.setUp(builder, stateDir, stringSerde, stringSerde);
-        driver.setTime(0L);
+        final Topology topology = builder.build();
+        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
 
-        // three state store should be created, one for source, one for aggregate and one for reduce
-        assertEquals(3, driver.allStateStores().size());
+            assertEquals(3, driver.getAllStateStores().size());
 
-        // contains the corresponding repartition source / sink nodes
-        assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
-        assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
-        assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
-        assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+            assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000003");
+            assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000004");
+            assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007");
+            assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008");
 
-        Field valSerializerField  = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
-        Field valDeserializerField  = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
-        valSerializerField.setAccessible(true);
-        valDeserializerField.setAccessible(true);
+            Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+            Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+            valSerializerField.setAccessible(true);
+            valDeserializerField.setAccessible(true);
 
-        assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
-        assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
-        assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
-        assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
+            assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
+            assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
+            assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
+            assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
+        }
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index a54e43e..a01d5cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -16,27 +16,30 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -45,27 +48,19 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableMapValuesTest {
 
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-    private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+    private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) {
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-
-        driver.process(topic1, "A", "1");
-        driver.process(topic1, "B", "2");
-        driver.process(topic1, "C", "3");
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "1"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4"));
+            assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed);
+        }
     }
 
     @Test
@@ -114,99 +109,106 @@ public class KTableMapValuesTest {
                                    final KTableImpl<String, String, Integer> table2,
                                    final KTableImpl<String, Integer, Integer> table3,
                                    final KTableImpl<String, String, String> table4) {
+
+        final Topology topology = builder.build();
+
         final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
         final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-        final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-        getter1.init(driver.context());
-        final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-        getter2.init(driver.context());
-        final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-        getter3.init(driver.context());
-        final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-        getter4.init(driver.context());
-
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
-
-        assertEquals("01", getter1.get("A"));
-        assertEquals("01", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertEquals(new Integer(1), getter2.get("A"));
-        assertEquals(new Integer(1), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertEquals("01", getter4.get("A"));
-        assertEquals("01", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
-
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
-
-        assertEquals("02", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertEquals(new Integer(2), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertEquals(new Integer(2), getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertEquals("02", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
-
-        driver.process(topic1, "A", "03");
-        driver.flushState();
-
-        assertEquals("03", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertEquals(new Integer(3), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertEquals("03", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
-
-        driver.process(topic1, "A", null);
-        driver.flushState();
-
-        assertNull(getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertNull(getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertNull(getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+        final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
+
+        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) {
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+
+            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
+            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
+            getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+        }
     }
 
     @Test
@@ -244,6 +246,8 @@ public class KTableMapValuesTest {
 
         final String topic1 = "topic1";
         final String topic2 = "topic2";
+        final String storeName2 = "anyMapName";
+        final String storeName3 = "anyFilterName";
 
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
@@ -253,14 +257,14 @@ public class KTableMapValuesTest {
                 public Integer apply(String value) {
                     return new Integer(value);
                 }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
+            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
         final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
             new Predicate<String, Integer>() {
                 @Override
                 public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
+            }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
         table1.toStream().to(topic2, produced);
         final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
@@ -285,37 +289,34 @@ public class KTableMapValuesTest {
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc", supplier, table2.name);
+        final Topology topology = builder.build().addProcessor("proc", supplier, table2.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
 
-        final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
 
-        assertFalse(table1.sendingOldValueEnabled());
-        assertFalse(table2.sendingOldValueEnabled());
+            assertFalse(table1.sendingOldValueEnabled());
+            assertFalse(table2.sendingOldValueEnabled());
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+            proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc.checkAndClearProcessResult("A:(3<-null)");
+            proc.checkAndClearProcessResult("A:(3<-null)");
 
-        driver.process(topic1, "A", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
-        proc.checkAndClearProcessResult("A:(null<-null)");
+            proc.checkAndClearProcessResult("A:(null<-null)");
+        }
     }
 
     @Test
@@ -340,34 +341,31 @@ public class KTableMapValuesTest {
 
         builder.build().addProcessor("proc", supplier, table2.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
 
-        final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
 
-        assertTrue(table1.sendingOldValueEnabled());
-        assertTrue(table2.sendingOldValueEnabled());
+            assertTrue(table1.sendingOldValueEnabled());
+            assertTrue(table2.sendingOldValueEnabled());
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+            proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc.checkAndClearProcessResult("A:(3<-2)");
+            proc.checkAndClearProcessResult("A:(3<-2)");
 
-        driver.process(topic1, "A", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
-        proc.checkAndClearProcessResult("A:(null<-3)");
+            proc.checkAndClearProcessResult("A:(null<-3)");
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 504d841..80a60ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,22 +16,26 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
+import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -42,17 +46,9 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
 
-    final private Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde);
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testKTable() {
@@ -60,38 +56,38 @@ public class KTableSourceTest {
 
         final String topic1 = "topic1";
 
-        final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
+        final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer()));
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir);
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 2);
-        driver.process(topic1, "C", 3);
-        driver.process(topic1, "D", 4);
-        driver.flushState();
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+        final ConsumerRecordFactory<String, Integer> integerFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(integerFactory.create(topic1, "A", 1));
+            driver.pipeInput(integerFactory.create(topic1, "B", 2));
+            driver.pipeInput(integerFactory.create(topic1, "C", 3));
+            driver.pipeInput(integerFactory.create(topic1, "D", 4));
+            driver.pipeInput(integerFactory.create(topic1, "A", null));
+            driver.pipeInput(integerFactory.create(topic1, "B", null));
+        }
 
         assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), supplier.theCapturedProcessor().processed);
     }
 
     @Test
     public void kTableShouldLogAndMeterOnSkippedRecords() {
-        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-        streamsBuilder.table(topic, Consumed.with(stringSerde, intSerde));
+        builder.table(topic, stringConsumed);
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.setUp(streamsBuilder, stateDir);
-        driver.process(topic, null, "value");
-        driver.flushState();
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, null, "value"));
+            LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[-1] offset=[-1]"));
+            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
+        }
     }
 
     @Test
@@ -102,39 +98,45 @@ public class KTableSourceTest {
 
         final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
+        final Topology topology = builder.build();
+
         final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir);
-        final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-        getter1.init(driver.context());
+        final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
+
+        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) {
+            final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        assertEquals("01", getter1.get("A"));
-        assertEquals("01", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        assertEquals("02", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        driver.process(topic1, "A", "03");
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        assertEquals("03", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+            driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
 
-        assertNull(getter1.get("A"));
-        assertNull(getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertNull(getter1.get("A"));
+            assertNull(getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+        }
 
     }
 
@@ -148,35 +150,32 @@ public class KTableSourceTest {
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", supplier, table1.name);
+        final Topology topology = builder.build().addProcessor("proc1", supplier, table1.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
 
-        final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
+            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc1.checkAndClearProcessResult("A:(03<-null)");
+            proc1.checkAndClearProcessResult("A:(03<-null)");
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+            driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
 
-        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        }
     }
 
     @Test
@@ -193,34 +192,31 @@ public class KTableSourceTest {
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", supplier, table1.name);
+        final Topology topology = builder.build().addProcessor("proc1", supplier, table1.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
 
-        final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
+            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc1.checkAndClearProcessResult("A:(03<-02)");
+            proc1.checkAndClearProcessResult("A:(03<-02)");
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+            driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
 
-        proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+        }
     }
 }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7f75265..155e54c 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -177,7 +177,7 @@ public class TopologyTestDriver implements Closeable {
 
     private final static int PARTITION_ID = 0;
     private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
-    private final StreamTask task;
+    final StreamTask task;
     private final GlobalStateUpdateTask globalStateTask;
     private final GlobalStateManager globalStateManager;
 
@@ -185,7 +185,8 @@ public class TopologyTestDriver implements Closeable {
 
     private final StateDirectory stateDirectory;
     private final Metrics metrics;
-    private final ProcessorTopology processorTopology;
+    final ProcessorTopology processorTopology;
+    final ProcessorTopology globalTopology;
 
     private final MockProducer<byte[], byte[]> producer;
 
@@ -240,7 +241,7 @@ public class TopologyTestDriver implements Closeable {
         internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
 
         processorTopology = internalTopologyBuilder.build(null);
-        final ProcessorTopology globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.