You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:29 UTC

[15/50] [abbrv] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9cafe8b..efb17fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -26,11 +26,13 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -39,7 +41,23 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableMapValuesTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testKTable() {
@@ -58,7 +76,7 @@ public class KTableMapValuesTest {
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "02");
@@ -70,230 +88,211 @@ public class KTableMapValuesTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                    table1.through(stringSerde, stringSerde, topic2);
-
-            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
-            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(driver.context());
-            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-            getter2.init(driver.context());
-            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-            getter3.init(driver.context());
-            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-            getter4.init(driver.context());
-
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
-
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(1), getter2.get("A"));
-            assertEquals(new Integer(1), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("01", getter4.get("A"));
-            assertEquals("01", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
-
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(2), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertEquals(new Integer(2), getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("02", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "03");
-
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(3), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("03", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", null);
-
-            assertNull(getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertNull(getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertNull(getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-        } finally {
-            Utils.delete(stateDir);
-        }
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                table1.through(stringSerde, stringSerde, topic2);
+
+        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+        KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        getter1.init(driver.context());
+        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+        getter2.init(driver.context());
+        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        getter3.init(driver.context());
+        KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+        getter4.init(driver.context());
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
+
+        assertEquals("01", getter1.get("A"));
+        assertEquals("01", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(1), getter2.get("A"));
+        assertEquals(new Integer(1), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertNull(getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("01", getter4.get("A"));
+        assertEquals("01", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
+
+        assertEquals("02", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(2), getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertEquals(new Integer(2), getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("02", getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", "03");
+
+        assertEquals("03", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(3), getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("03", getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", null);
+
+        assertNull(getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertNull(getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertNull(getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
     }
 
     @Test
     public void testNotSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
+        String topic1 = "topic1";
 
-            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
 
-            builder.addProcessor("proc", proc, table2.name);
+        MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc", proc, table2.name);
 
-            assertFalse(table1.sendingOldValueEnabled());
-            assertFalse(table2.sendingOldValueEnabled());
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        assertFalse(table1.sendingOldValueEnabled());
+        assertFalse(table2.sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
-            proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
-            proc.checkAndClearProcessResult("A:(3<-null)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
+        proc.checkAndClearProcessResult("A:(3<-null)");
 
-            proc.checkAndClearProcessResult("A:(null<-null)");
+        driver.process(topic1, "A", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc.checkAndClearProcessResult("A:(null<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
+        String topic1 = "topic1";
 
-            table2.enableSendingOldValues();
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
 
-            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+        table2.enableSendingOldValues();
 
-            builder.addProcessor("proc", proc, table2.name);
+        MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc", proc, table2.name);
 
-            assertTrue(table1.sendingOldValueEnabled());
-            assertTrue(table2.sendingOldValueEnabled());
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        assertTrue(table1.sendingOldValueEnabled());
+        assertTrue(table2.sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
-            proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
-            proc.checkAndClearProcessResult("A:(3<-2)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
+        proc.checkAndClearProcessResult("A:(3<-2)");
 
-            proc.checkAndClearProcessResult("A:(null<-3)");
+        driver.process(topic1, "A", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc.checkAndClearProcessResult("A:(null<-3)");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 7c158e2..aaa6cc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -24,11 +24,13 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -36,7 +38,23 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testKTable() {
@@ -49,7 +67,7 @@ public class KTableSourceTest {
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);
@@ -63,138 +81,120 @@ public class KTableSourceTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            String topic1 = "topic1";
+        String topic1 = "topic1";
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
-            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(driver.context());
+        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        getter1.init(driver.context());
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertEquals("01", getter1.get("A"));
+        assertEquals("01", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertEquals("02", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-            driver.process(topic1, "A", "03");
+        driver.process(topic1, "A", "03");
 
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertEquals("03", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-            assertNull(getter1.get("A"));
-            assertNull(getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertNull(getter1.get("A"));
+        assertNull(getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-        } finally {
-            Utils.delete(stateDir);
-        }
     }
 
     @Test
     public void testNotSedingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        String topic1 = "topic1";
 
-            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
-            builder.addProcessor("proc1", proc1, table1.name);
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc1", proc1, table1.name);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
-            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
-            proc1.checkAndClearProcessResult("A:(03<-null)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        proc1.checkAndClearProcessResult("A:(03<-null)");
 
-            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
 
     @Test
     public void testSedingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        String topic1 = "topic1";
 
-            table1.enableSendingOldValues();
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
-            assertTrue(table1.sendingOldValueEnabled());
+        table1.enableSendingOldValues();
 
-            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        assertTrue(table1.sendingOldValueEnabled());
 
-            builder.addProcessor("proc1", proc1, table1.name);
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc1", proc1, table1.name);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
-            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
-            proc1.checkAndClearProcessResult("A:(03<-02)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        proc1.checkAndClearProcessResult("A:(03<-02)");
 
-            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
index 22948ab..c8707af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -39,11 +40,19 @@ public class KeyValuePrinterProcessorTest {
 
     private String topicName = "topic";
     private Serde<String> stringSerde = Serdes.String();
-    private Serde<byte[]> bytesSerde = Serdes.ByteArray();
     private ByteArrayOutputStream baos = new ByteArrayOutputStream();
     private KStreamBuilder builder = new KStreamBuilder();
     private PrintStream printStream = new PrintStream(baos);
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testPrintKeyValueDefaultSerde() throws Exception {
@@ -57,7 +66,7 @@ public class KeyValuePrinterProcessorTest {
         KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
         stream.process(keyValuePrinter);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < suppliedKeys.length; i++) {
             driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
         }
@@ -79,7 +88,7 @@ public class KeyValuePrinterProcessorTest {
 
         stream.process(keyValuePrinter);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         String suppliedKey = null;
         byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index d738794..7316804 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 
@@ -82,6 +83,12 @@ public class KStreamTestDriver {
 
     public void process(String topicName, Object key, Object value) {
         currNode = topology.source(topicName);
+
+        // if currNode is null, check if this topic is a changelog topic;
+        // if yes, skip
+        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))
+            return;
+
         try {
             forward(key, value);
         } finally {
@@ -108,10 +115,6 @@ public class KStreamTestDriver {
         context.setTime(timestamp);
     }
 
-    public StateStore getStateStore(String name) {
-        return context.getStateStore(name);
-    }
-
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value) {
         ProcessorNode thisNode = currNode;
@@ -153,6 +156,23 @@ public class KStreamTestDriver {
         }
     }
 
+    public void close() {
+        // close all processors
+        for (ProcessorNode node : topology.processors()) {
+            currNode = node;
+            try {
+                node.close();
+            } finally {
+                currNode = null;
+            }
+        }
+
+        // close all state stores
+        for (StateStore store : context.allStateStores().values()) {
+            store.close();
+        }
+    }
+
     public Set<String> allProcessorNames() {
         Set<String> names = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
index ae8c2fd..769ee71 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -26,7 +26,7 @@ public class MockKeyValueMapper {
 
         @Override
         public KeyValue<K, V> apply(K key, V value) {
-            return new KeyValue<>(key, value);
+            return KeyValue.pair(key, value);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
new file mode 100644
index 0000000..4d44166
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+public class MockValueJoiner {
+
+    private static class StringJoin implements ValueJoiner<String, String, String> {
+
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    public final static ValueJoiner<String, String, String> STRING_JOINER = new StringJoin();
+}
\ No newline at end of file