You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/19 02:18:36 UTC

[kafka] branch trunk updated: KAFKA-6474: remove KStreamTestDriver (#6732)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c140f09  KAFKA-6474: remove KStreamTestDriver (#6732)
c140f09 is described below

commit c140f09406b119d86d43bba6248d17fe0120a4dd
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat May 18 21:18:14 2019 -0500

    KAFKA-6474: remove KStreamTestDriver (#6732)
    
    The implementation of KIP-258 broke the state store methods in KStreamTestDriver.
    These methods were unused in this project, so the breakage was not detected.
    Since this is an internal testing utility, and it was deprecated and partially removed in
    favor of TopologyTestDriver, I opted to just complete the removal of the class.
    
    Reviewers: A. Sophie Blee-Goldman <ab...@gmail.com>, Boyang Chen <bo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kstream/internals/KStreamTransformTest.java    |  67 ++--
 .../kstream/internals/KTableAggregateTest.java     | 411 ++++++++-------------
 .../org/apache/kafka/test/KStreamTestDriver.java   | 277 --------------
 .../apache/kafka/streams/TopologyTestDriver.java   |   1 -
 4 files changed, 186 insertions(+), 570 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 8f87d40..fcf6aea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
@@ -30,24 +31,22 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Properties;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamTransformTest {
-    private final String topicName = "topic";
+    private static final String TOPIC_NAME = "topic";
     private final ConsumerRecordFactory<Integer, Integer> recordFactory =
         new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer(), 0L);
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
 
-    @SuppressWarnings("deprecation")
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver kstreamDriver = new org.apache.kafka.test.KStreamTestDriver();
-
     @Test
     public void testTransform() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -57,7 +56,13 @@ public class KStreamTransformTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext context) {}
+                public void init(final ProcessorContext context) {
+                    context.schedule(
+                        Duration.ofMillis(1),
+                        PunctuationType.WALL_CLOCK_TIME,
+                        timestamp -> context.forward(-1, (int) timestamp)
+                    );
+                }
 
                 @Override
                 public KeyValue<Integer, Integer> transform(final Number key, final Number value) {
@@ -72,27 +77,39 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         final MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
-        kstreamDriver.setUp(builder);
-        for (final int expectedKey : expectedKeys) {
-            kstreamDriver.setTime(expectedKey / 2L);
-            kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
-        }
-
-        // TODO: un-comment after replaced with TopologyTestDriver
-        //kstreamDriver.punctuate(2);
-        //kstreamDriver.punctuate(3);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(
+            builder.build(),
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test")
+            )),
+            0L)) {
+            final ConsumerRecordFactory<Integer, Integer> recordFactory =
+                new ConsumerRecordFactory<>(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());
 
-        //assertEquals(6, processor.theCapturedProcessor().processed.size());
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(expectedKey, expectedKey * 10, expectedKey / 2L));
+            }
 
-        //String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+            driver.advanceWallClockTime(2);
+            driver.advanceWallClockTime(1);
 
-        final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 5)", "200:1110 (ts: 50)", "2000:11110 (ts: 500)"};
+            final String[] expected = {
+                "2:10 (ts: 0)",
+                "20:110 (ts: 5)",
+                "200:1110 (ts: 50)",
+                "2000:11110 (ts: 500)",
+                "-1:2 (ts: 2)",
+                "-1:3 (ts: 3)"
+            };
 
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
+            assertEquals(expected.length, processor.theCapturedProcessor().processed.size());
+            for (int i = 0; i < expected.length; i++) {
+                assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
+            }
         }
     }
 
@@ -125,15 +142,15 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         final MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+                driver.pipeInput(recordFactory.create(TOPIC_NAME, expectedKey, expectedKey * 10, 0L));
             }
 
-            // This tick will yield yields the "-1:2" result
+            // This tick yields the "-1:2" result
             driver.advanceWallClockTime(2);
             // This tick further advances the clock to 3, which leads to the "-1:3" result
             driver.advanceWallClockTime(1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 6144051..b704e13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -18,52 +18,38 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.junit.Assert.assertEquals;
 
-@SuppressWarnings("deprecation")
 public class KTableAggregateTest {
     private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-    private final Grouped<String, String> stringSerialzied = Grouped.with(stringSerde, stringSerde);
+    private final Grouped<String, String> stringSerialized = Grouped.with(stringSerde, stringSerde);
     private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
 
-    private File stateDir = null;
-
-    @Rule
-    public EmbeddedKafkaCluster cluster = null;
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver driver = new org.apache.kafka.test.KStreamTestDriver();
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
-
     @Test
     public void testAggBasic() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -73,7 +59,7 @@ public class KTableAggregateTest {
         final KTable<String, String> table2 = table1
             .groupBy(
                 MockMapper.noOpKeyValueMapper(),
-                stringSerialzied)
+                stringSerialized)
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -83,76 +69,43 @@ public class KTableAggregateTest {
 
         table2.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        driver.setTime(20L);
-        driver.process(topic1, "A", "3");
-        driver.flushState();
-        driver.setTime(18L);
-        driver.process(topic1, "B", "4");
-        driver.flushState();
-        driver.setTime(5L);
-        driver.process(topic1, "C", "5");
-        driver.flushState();
-        driver.setTime(25L);
-        driver.process(topic1, "D", "6");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(topic1, "B", "7");
-        driver.flushState();
-        driver.setTime(10L);
-        driver.process(topic1, "C", "8");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "A:0+1 (ts: 10)",
-                "B:0+2 (ts: 15)",
-                "A:0+1-1+3 (ts: 20)",
-                "B:0+2-2+4 (ts: 18)",
-                "C:0+5 (ts: 5)",
-                "D:0+6 (ts: 25)",
-                "B:0+2-2+4-4+7 (ts: 18)",
-                "C:0+5-5+8 (ts: 10)"),
-            supplier.theCapturedProcessor().processed);
-    }
-
-
-    @Test
-    public void testAggCoalesced() {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final String topic1 = "topic1";
-
-        final KTable<String, String> table1 = builder.table(topic1, consumed);
-        final KTable<String, String> table2 = table1
-            .groupBy(
-                MockMapper.noOpKeyValueMapper(),
-                stringSerialzied)
-            .aggregate(MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                MockAggregator.TOSTRING_REMOVER,
-                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic1-Canonized")
-                    .withValueSerde(stringSerde));
-
-        table2.toStream().process(supplier);
-
-        driver.setUp(builder, stateDir);
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(20L);
-        driver.process(topic1, "A", "3");
-        driver.setTime(15L);
-        driver.process(topic1, "A", "4");
-        driver.flushState();
-
-        assertEquals(Collections.singletonList("A:0+4 (ts: 15)"), supplier.theCapturedProcessor().processed);
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L);
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 15L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "3", 20L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "4", 18L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "6", 25L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "7", 15L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "8", 10L));
+
+            assertEquals(
+                asList(
+                    "A:0+1 (ts: 10)",
+                    "B:0+2 (ts: 15)",
+                    "A:0+1-1 (ts: 20)",
+                    "A:0+1-1+3 (ts: 20)",
+                    "B:0+2-2 (ts: 18)",
+                    "B:0+2-2+4 (ts: 18)",
+                    "C:0+5 (ts: 5)",
+                    "D:0+6 (ts: 25)",
+                    "B:0+2-2+4-4 (ts: 18)",
+                    "B:0+2-2+4-4+7 (ts: 18)",
+                    "C:0+5-5 (ts: 10)",
+                    "C:0+5-5+8 (ts: 10)"),
+                supplier.theCapturedProcessor().processed);
+        }
     }
 
     @Test
@@ -173,7 +126,7 @@ public class KTableAggregateTest {
                             return KeyValue.pair(value, value);
                     }
                 },
-                stringSerialzied)
+                stringSerialized)
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -183,78 +136,74 @@ public class KTableAggregateTest {
 
         table2.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir);
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(topic1, "A", null);
-        driver.flushState();
-        driver.setTime(12L);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        driver.setTime(20L);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        driver.setTime(25L);
-        driver.process(topic1, "null", "3");
-        driver.flushState();
-        driver.setTime(23L);
-        driver.process(topic1, "B", "4");
-        driver.flushState();
-        driver.setTime(24L);
-        driver.process(topic1, "NULL", "5");
-        driver.flushState();
-        driver.setTime(22L);
-        driver.process(topic1, "B", "7");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "1:0+1 (ts: 10)",
-                "1:0+1-1 (ts: 15)",
-                "1:0+1-1+1 (ts: 15)",
-                "2:0+2 (ts: 20)",
-                  //noop
-                "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)",
-                  //noop
-                "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"),
-            supplier.theCapturedProcessor().processed);
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L);
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 15L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 12L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 20L));
+            driver.pipeInput(recordFactory.create(topic1, "null", "3", 25L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "4", 23L));
+            driver.pipeInput(recordFactory.create(topic1, "NULL", "5", 24L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "7", 22L));
+
+            assertEquals(
+                asList(
+                    "1:0+1 (ts: 10)",
+                    "1:0+1-1 (ts: 15)",
+                    "1:0+1-1+1 (ts: 15)",
+                    "2:0+2 (ts: 20)",
+                    //noop
+                    "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)",
+                    //noop
+                    "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"),
+                supplier.theCapturedProcessor().processed);
+        }
     }
 
-    private void testCountHelper(final StreamsBuilder builder,
-                                 final String input,
-                                 final MockProcessorSupplier<String, Object> supplier) {
-        driver.setUp(builder, stateDir);
-
-        driver.setTime(10L);
-        driver.process(input, "A", "green");
-        driver.flushState();
-        driver.setTime(9L);
-        driver.process(input, "B", "green");
-        driver.flushState();
-        driver.setTime(12L);
-        driver.process(input, "A", "blue");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(input, "C", "yellow");
-        driver.flushState();
-        driver.setTime(11L);
-        driver.process(input, "D", "green");
-        driver.flushState();
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "green:1 (ts: 10)",
-                "green:2 (ts: 10)",
-                "green:1 (ts: 12)", "blue:1 (ts: 12)",
-                "yellow:1 (ts: 15)",
-                "green:2 (ts: 12)"),
-            supplier.theCapturedProcessor().processed);
+    private static void testCountHelper(final StreamsBuilder builder,
+                                        final String input,
+                                        final MockProcessorSupplier<String, Object> supplier) {
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L);
+
+            driver.pipeInput(recordFactory.create(input, "A", "green", 10L));
+            driver.pipeInput(recordFactory.create(input, "B", "green", 9L));
+            driver.pipeInput(recordFactory.create(input, "A", "blue", 12L));
+            driver.pipeInput(recordFactory.create(input, "C", "yellow", 15L));
+            driver.pipeInput(recordFactory.create(input, "D", "green", 11L));
+
+            assertEquals(
+                asList(
+                    "green:1 (ts: 10)",
+                    "green:2 (ts: 10)",
+                    "green:1 (ts: 12)", "blue:1 (ts: 12)",
+                    "yellow:1 (ts: 15)",
+                    "green:2 (ts: 12)"),
+                supplier.theCapturedProcessor().processed);
+        }
     }
 
+
     @Test
     public void testCount() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -262,7 +211,7 @@ public class KTableAggregateTest {
 
         builder
             .table(input, consumed)
-            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied)
+            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized)
             .count(Materialized.as("count"))
             .toStream()
             .process(supplier);
@@ -277,7 +226,7 @@ public class KTableAggregateTest {
 
         builder
             .table(input, consumed)
-            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied)
+            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized)
             .count()
             .toStream()
             .process(supplier);
@@ -286,43 +235,6 @@ public class KTableAggregateTest {
     }
 
     @Test
-    public void testCountCoalesced() {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final String input = "count-test-input";
-        final MockProcessorSupplier<String, Long> supplier = new MockProcessorSupplier<>();
-
-        builder
-            .table(input, consumed)
-            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied)
-            .count(Materialized.as("count"))
-            .toStream()
-            .process(supplier);
-
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<String, Long> proc = supplier.theCapturedProcessor();
-
-        driver.setTime(10L);
-        driver.process(input, "A", "green");
-        driver.setTime(8L);
-        driver.process(input, "B", "green");
-        driver.setTime(9L);
-        driver.process(input, "A", "blue");
-        driver.setTime(10L);
-        driver.process(input, "C", "yellow");
-        driver.setTime(15L);
-        driver.process(input, "D", "green");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "blue:1 (ts: 9)",
-                "yellow:1 (ts: 10)",
-                "green:2 (ts: 15)"),
-            proc.processed);
-    }
-
-    @Test
     public void testRemoveOldBeforeAddNew() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
@@ -334,7 +246,7 @@ public class KTableAggregateTest {
                 (key, value) -> KeyValue.pair(
                     String.valueOf(key.charAt(0)),
                     String.valueOf(key.charAt(1))),
-                stringSerialzied)
+                stringSerialized)
             .aggregate(
                 () -> "",
                 (aggKey, value, aggregate) -> aggregate + value,
@@ -344,70 +256,35 @@ public class KTableAggregateTest {
             .toStream()
             .process(supplier);
 
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
-
-        driver.setTime(10L);
-        driver.process(input, "11", "A");
-        driver.flushState();
-        driver.setTime(8L);
-        driver.process(input, "12", "B");
-        driver.flushState();
-        driver.setTime(12L);
-        driver.process(input, "11", null);
-        driver.flushState();
-        driver.setTime(6L);
-        driver.process(input, "12", "C");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "1:1 (ts: 10)",
-                "1:12 (ts: 10)",
-                "1:2 (ts: 12)",
-                "1:2 (ts: 12)"),
-            proc.processed);
-    }
-
-    @Test
-    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() {
-        final String tableOne = "tableOne";
-        final String tableTwo = "tableTwo";
-        final StreamsBuilder builder = new StreamsBuilder();
-        final String reduceTopic = "TestDriver-reducer-store-repartition";
-        final Map<String, Long> reduceResults = new HashMap<>();
-
-        final KTable<String, String> one = builder.table(tableOne, consumed);
-        final KTable<Long, String> two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String()));
-
-        final KTable<String, Long> reduce = two
-            .groupBy(
-                (key, value) -> new KeyValue<>(value, key),
-                Grouped.with(Serdes.String(), Serdes.Long()))
-            .reduce(
-                (value1, value2) -> value1 + value2,
-                (value1, value2) -> value1 - value2,
-                Materialized.as("reducer-store"));
-
-        reduce.toStream().foreach(reduceResults::put);
-
-        one.leftJoin(reduce, (value1, value2) -> value1 + ":" + value2)
-            .mapValues(value -> value);
-
-        driver.setUp(builder, stateDir, 111);
-        driver.process(reduceTopic, "1", new Change<>(1L, null));
-        driver.process("tableOne", "2", "2");
-        // this should trigger eviction on the reducer-store topic
-        driver.process(reduceTopic, "2", new Change<>(2L, null));
-        // this wont as it is the same value
-        driver.process(reduceTopic, "2", new Change<>(2L, null));
-        assertEquals(Long.valueOf(2L), reduceResults.get("2"));
-
-        // this will trigger eviction on the tableOne topic
-        // that in turn will cause an eviction on reducer-topic. It will flush
-        // key 2 as it is the only dirty entry in the cache
-        driver.process("tableOne", "1", "5");
-        assertEquals(Long.valueOf(4L), reduceResults.get("2"));
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 0L);
+
+            final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
+
+            driver.pipeInput(recordFactory.create(input, "11", "A", 10L));
+            driver.pipeInput(recordFactory.create(input, "12", "B", 8L));
+            driver.pipeInput(recordFactory.create(input, "11", (String) null, 12L));
+            driver.pipeInput(recordFactory.create(input, "12", "C", 6L));
+
+            assertEquals(
+                asList(
+                    "1:1 (ts: 10)",
+                    "1:12 (ts: 10)",
+                    "1:2 (ts: 12)",
+                    "1: (ts: 12)",
+                    "1:2 (ts: 12)"
+                ),
+                proc.processed
+            );
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
deleted file mode 100644
index b83936b..0000000
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TopologyWrapper;
-import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.junit.rules.ExternalResource;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * KStreamTestDriver
- *
- * @deprecated please use {@link org.apache.kafka.streams.TopologyTestDriver} instead
- */
-@Deprecated
-public class KStreamTestDriver extends ExternalResource {
-
-    private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
-
-    private ProcessorTopology topology;
-    private InternalMockProcessorContext context;
-    private ProcessorTopology globalTopology;
-    private final LogContext logContext = new LogContext("testCache ");
-
-    public void setUp(final StreamsBuilder builder) {
-        setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
-    }
-
-    public void setUp(final StreamsBuilder builder, final File stateDir) {
-        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
-    }
-
-    public void setUp(final StreamsBuilder builder, final File stateDir, final long cacheSize) {
-        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
-    }
-
-    public void setUp(final StreamsBuilder builder,
-                      final File stateDir,
-                      final Serde<?> keySerde,
-                      final Serde<?> valSerde) {
-        setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
-    }
-
-    public void setUp(final StreamsBuilder builder,
-                      final File stateDir,
-                      final Serde<?> keySerde,
-                      final Serde<?> valSerde,
-                      final long cacheSize) {
-        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
-
-        internalTopologyBuilder.setApplicationId("TestDriver");
-        topology = internalTopologyBuilder.build(null);
-        globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
-
-        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", null));
-
-        // init global topology first as it will add stores to the
-        // store map that are required for joins etc.
-        if (globalTopology != null) {
-            initTopology(globalTopology, globalTopology.globalStateStores());
-        }
-        initTopology(topology, topology.stateStores());
-    }
-
-    @Override
-    protected void after() {
-        if (topology != null) {
-            close();
-        }
-    }
-
-    private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) {
-        for (final StateStore store : stores) {
-            try {
-                store.init(context, store);
-            } catch (final RuntimeException e) {
-                new RuntimeException("Fatal exception initializing store.", e).printStackTrace();
-                throw e;
-            }
-        }
-
-        for (final ProcessorNode node : topology.processors()) {
-            context.setCurrentNode(node);
-            try {
-                node.init(context);
-            } finally {
-                context.setCurrentNode(null);
-            }
-        }
-    }
-
-    public ProcessorTopology topology() {
-        return topology;
-    }
-
-    public ProcessorContext context() {
-        return context;
-    }
-
-    public void process(final String topicName, final Object key, final Object value) {
-        final ProcessorNode prevNode = context.currentNode();
-        final ProcessorNode currNode = sourceNodeByTopicName(topicName);
-
-        if (currNode != null) {
-            context.setRecordContext(createRecordContext(topicName, context.timestamp()));
-            context.setCurrentNode(currNode);
-            try {
-                context.forward(key, value);
-            } finally {
-                context.setCurrentNode(prevNode);
-            }
-        }
-    }
-
-    private ProcessorNode sourceNodeByTopicName(final String topicName) {
-        ProcessorNode topicNode = topology.source(topicName);
-        if (topicNode == null) {
-            for (final String sourceTopic : topology.sourceTopics()) {
-                if (Pattern.compile(sourceTopic).matcher(topicName).matches()) {
-                    return topology.source(sourceTopic);
-                }
-            }
-            if (globalTopology != null) {
-                topicNode = globalTopology.source(topicName);
-            }
-        }
-
-        return topicNode;
-    }
-
-    public void setTime(final long timestamp) {
-        context.setTime(timestamp);
-    }
-
-    public void close() {
-        // close all processors
-        for (final ProcessorNode node : topology.processors()) {
-            context.setCurrentNode(node);
-            try {
-                node.close();
-            } finally {
-                context.setCurrentNode(null);
-            }
-        }
-
-        closeState();
-    }
-
-    public Set<String> allProcessorNames() {
-        final Set<String> names = new HashSet<>();
-
-        final List<ProcessorNode> nodes = topology.processors();
-
-        for (final ProcessorNode node : nodes) {
-            names.add(node.name());
-        }
-
-        return names;
-    }
-
-    public ProcessorNode processor(final String name) {
-        final List<ProcessorNode> nodes = topology.processors();
-
-        for (final ProcessorNode node : nodes) {
-            if (node.name().equals(name)) {
-                return node;
-            }
-        }
-
-        return null;
-    }
-
-    public Map<String, StateStore> allStateStores() {
-        return context.allStateStores();
-    }
-
-    public void flushState() {
-        for (final StateStore stateStore : context.allStateStores().values()) {
-            stateStore.flush();
-        }
-    }
-
-    private void closeState() {
-        // we need to first flush all stores before trying to close any one
-        // of them since the flushing could cause eviction and hence tries to access other stores
-        flushState();
-
-        for (final StateStore stateStore : context.allStateStores().values()) {
-            stateStore.close();
-        }
-    }
-
-    private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) {
-        return new ProcessorRecordContext(timestamp, -1, -1, topicName, null);
-    }
-
-    private class MockRecordCollector extends RecordCollectorImpl {
-        MockRecordCollector() {
-            super("KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
-        }
-
-        @Override
-        public <K, V> void send(final String topic,
-                                final K key,
-                                final V value,
-                                final Headers headers,
-                                final Long timestamp,
-                                final Serializer<K> keySerializer,
-                                final Serializer<V> valueSerializer,
-                                final StreamPartitioner<? super K, ? super V> partitioner) {
-            // The serialization is skipped.
-            if (sourceNodeByTopicName(topic) != null) {
-                process(topic, key, value);
-            }
-        }
-
-        @Override
-        public <K, V> void send(final String topic,
-                                final K key,
-                                final V value,
-                                final Headers headers,
-                                final Integer partition,
-                                final Long timestamp,
-                                final Serializer<K> keySerializer,
-                                final Serializer<V> valueSerializer) {
-            // The serialization is skipped.
-            if (sourceNodeByTopicName(topic) != null) {
-                process(topic, key, value);
-            }
-        }
-
-        @Override
-        public void flush() {}
-
-        @Override
-        public void close() {}
-    }
-}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 23dbf30..38da0d8 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -587,7 +587,6 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
-    @SuppressWarnings("WeakerAccess")
     public Map<String, StateStore> getAllStateStores() {
         final Map<String, StateStore> allStores = new HashMap<>();
         for (final String storeName : internalTopologyBuilder.allStateStoreName()) {