You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/26 20:39:51 UTC

[1/4] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623

Repository: kafka
Updated Branches:
  refs/heads/trunk f60a3fad3 -> 1a73629bb


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9cafe8b..efb17fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -26,11 +26,13 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -39,7 +41,23 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableMapValuesTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testKTable() {
@@ -58,7 +76,7 @@ public class KTableMapValuesTest {
         MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
         table2.toStream().process(proc2);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "02");
@@ -70,230 +88,211 @@ public class KTableMapValuesTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                    table1.through(stringSerde, stringSerde, topic2);
-
-            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
-            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(driver.context());
-            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-            getter2.init(driver.context());
-            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-            getter3.init(driver.context());
-            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-            getter4.init(driver.context());
-
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
-
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(1), getter2.get("A"));
-            assertEquals(new Integer(1), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("01", getter4.get("A"));
-            assertEquals("01", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
-
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(2), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertEquals(new Integer(2), getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("02", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "03");
-
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(3), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("03", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", null);
-
-            assertNull(getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertNull(getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertNull(getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-        } finally {
-            Utils.delete(stateDir);
-        }
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                table1.through(stringSerde, stringSerde, topic2);
+
+        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+        KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        getter1.init(driver.context());
+        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+        getter2.init(driver.context());
+        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        getter3.init(driver.context());
+        KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+        getter4.init(driver.context());
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
+
+        assertEquals("01", getter1.get("A"));
+        assertEquals("01", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(1), getter2.get("A"));
+        assertEquals(new Integer(1), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertNull(getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("01", getter4.get("A"));
+        assertEquals("01", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
+
+        assertEquals("02", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(2), getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertEquals(new Integer(2), getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("02", getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", "03");
+
+        assertEquals("03", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(3), getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("03", getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", null);
+
+        assertNull(getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertNull(getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertNull(getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
     }
 
     @Test
     public void testNotSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
+        String topic1 = "topic1";
 
-            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
 
-            builder.addProcessor("proc", proc, table2.name);
+        MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc", proc, table2.name);
 
-            assertFalse(table1.sendingOldValueEnabled());
-            assertFalse(table2.sendingOldValueEnabled());
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        assertFalse(table1.sendingOldValueEnabled());
+        assertFalse(table2.sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
-            proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
-            proc.checkAndClearProcessResult("A:(3<-null)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
+        proc.checkAndClearProcessResult("A:(3<-null)");
 
-            proc.checkAndClearProcessResult("A:(null<-null)");
+        driver.process(topic1, "A", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc.checkAndClearProcessResult("A:(null<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
+        String topic1 = "topic1";
 
-            table2.enableSendingOldValues();
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
 
-            MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+        table2.enableSendingOldValues();
 
-            builder.addProcessor("proc", proc, table2.name);
+        MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc", proc, table2.name);
 
-            assertTrue(table1.sendingOldValueEnabled());
-            assertTrue(table2.sendingOldValueEnabled());
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        assertTrue(table1.sendingOldValueEnabled());
+        assertTrue(table2.sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
-            proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
-            proc.checkAndClearProcessResult("A:(3<-2)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
+        proc.checkAndClearProcessResult("A:(3<-2)");
 
-            proc.checkAndClearProcessResult("A:(null<-3)");
+        driver.process(topic1, "A", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc.checkAndClearProcessResult("A:(null<-3)");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 7c158e2..aaa6cc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -24,11 +24,13 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -36,7 +38,23 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testKTable() {
@@ -49,7 +67,7 @@ public class KTableSourceTest {
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);
@@ -63,138 +81,120 @@ public class KTableSourceTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            String topic1 = "topic1";
+        String topic1 = "topic1";
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
-            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(driver.context());
+        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        getter1.init(driver.context());
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertEquals("01", getter1.get("A"));
+        assertEquals("01", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertEquals("02", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-            driver.process(topic1, "A", "03");
+        driver.process(topic1, "A", "03");
 
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertEquals("03", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-            assertNull(getter1.get("A"));
-            assertNull(getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
+        assertNull(getter1.get("A"));
+        assertNull(getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
 
-        } finally {
-            Utils.delete(stateDir);
-        }
     }
 
     @Test
     public void testNotSedingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        String topic1 = "topic1";
 
-            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
-            builder.addProcessor("proc1", proc1, table1.name);
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc1", proc1, table1.name);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
-            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
-            proc1.checkAndClearProcessResult("A:(03<-null)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        proc1.checkAndClearProcessResult("A:(03<-null)");
 
-            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
 
     @Test
     public void testSedingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        String topic1 = "topic1";
 
-            table1.enableSendingOldValues();
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
-            assertTrue(table1.sendingOldValueEnabled());
+        table1.enableSendingOldValues();
 
-            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        assertTrue(table1.sendingOldValueEnabled());
 
-            builder.addProcessor("proc1", proc1, table1.name);
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc1", proc1, table1.name);
 
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
 
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
+        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
-            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
 
-            driver.process(topic1, "A", "03");
+        proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
-            proc1.checkAndClearProcessResult("A:(03<-02)");
+        driver.process(topic1, "A", "03");
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        proc1.checkAndClearProcessResult("A:(03<-02)");
 
-            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
index 22948ab..c8707af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -39,11 +40,19 @@ public class KeyValuePrinterProcessorTest {
 
     private String topicName = "topic";
     private Serde<String> stringSerde = Serdes.String();
-    private Serde<byte[]> bytesSerde = Serdes.ByteArray();
     private ByteArrayOutputStream baos = new ByteArrayOutputStream();
     private KStreamBuilder builder = new KStreamBuilder();
     private PrintStream printStream = new PrintStream(baos);
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
     @Test
     public void testPrintKeyValueDefaultSerde() throws Exception {
@@ -57,7 +66,7 @@ public class KeyValuePrinterProcessorTest {
         KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
         stream.process(keyValuePrinter);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < suppliedKeys.length; i++) {
             driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
         }
@@ -79,7 +88,7 @@ public class KeyValuePrinterProcessorTest {
 
         stream.process(keyValuePrinter);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         String suppliedKey = null;
         byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index d738794..7316804 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 
@@ -82,6 +83,12 @@ public class KStreamTestDriver {
 
     public void process(String topicName, Object key, Object value) {
         currNode = topology.source(topicName);
+
+        // if currNode is null, check if this topic is a changelog topic;
+        // if yes, skip
+        if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))
+            return;
+
         try {
             forward(key, value);
         } finally {
@@ -108,10 +115,6 @@ public class KStreamTestDriver {
         context.setTime(timestamp);
     }
 
-    public StateStore getStateStore(String name) {
-        return context.getStateStore(name);
-    }
-
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value) {
         ProcessorNode thisNode = currNode;
@@ -153,6 +156,23 @@ public class KStreamTestDriver {
         }
     }
 
+    public void close() {
+        // close all processors
+        for (ProcessorNode node : topology.processors()) {
+            currNode = node;
+            try {
+                node.close();
+            } finally {
+                currNode = null;
+            }
+        }
+
+        // close all state stores
+        for (StateStore store : context.allStateStores().values()) {
+            store.close();
+        }
+    }
+
     public Set<String> allProcessorNames() {
         Set<String> names = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
index ae8c2fd..769ee71 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -26,7 +26,7 @@ public class MockKeyValueMapper {
 
         @Override
         public KeyValue<K, V> apply(K key, V value) {
-            return new KeyValue<>(key, value);
+            return KeyValue.pair(key, value);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
new file mode 100644
index 0000000..4d44166
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+public class MockValueJoiner {
+
+    private static class StringJoin implements ValueJoiner<String, String, String> {
+
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    public final static ValueJoiner<String, String, String> STRING_JOINER = new StringJoin();
+}
\ No newline at end of file


[3/4] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 5f19b9e..1bd870e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -40,6 +41,16 @@ public class KStreamSelectKeyTest {
     final private Serde<Integer> integerSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
+    private KStreamTestDriver driver;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testSelectKey() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -66,7 +77,7 @@ public class KStreamSelectKeyTest {
 
         stream.selectKey(selector).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         for (int expectedValue : expectedValues) {
             driver.process(topicName, null, expectedValue);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index a0a61f2..e0bdfbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -37,6 +38,16 @@ public class KStreamTransformTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
 
+    private KStreamTestDriver driver;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testTransform() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -76,7 +87,7 @@ public class KStreamTransformTest {
         KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
         stream.transform(transformerSupplier).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index f5f9698..aebcc76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -36,6 +37,16 @@ public class KStreamTransformValuesTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
 
+    private KStreamTestDriver driver;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testTransform() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -76,7 +87,7 @@ public class KStreamTransformValuesTest {
         stream = builder.stream(intSerde, intSerde, topicName);
         stream.transformValues(valueTransformerSupplier).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 3c7a1bd..828103a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -20,266 +20,257 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowAggregateTest {
 
-    final private Serde<String> strSerde = new Serdes.StringSerde();
+    final private Serde<String> strSerde = Serdes.String();
 
-    private class StringAdd implements Aggregator<String, String, String> {
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-        @Override
-        public String apply(String aggKey, String value, String aggregate) {
-            return aggregate + "+" + value;
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
     }
 
-    private class StringInit implements Initializer<String> {
-
-        @Override
-        public String apply() {
-            return "0";
-        }
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testAggBasic() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
-
-            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
-            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
-                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                    strSerde,
-                    strSerde);
-
-            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-
-            driver.setTime(0L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(1L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(2L);
-            driver.process(topic1, "C", "3");
-            driver.setTime(3L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(4L);
-            driver.process(topic1, "A", "1");
-
-            driver.setTime(5L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(6L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(7L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(8L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(9L);
-            driver.process(topic1, "C", "3");
-
-            driver.setTime(10L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(11L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(12L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(13L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(14L);
-            driver.process(topic1, "C", "3");
-
-            assertEquals(Utils.mkList(
-                    "[A@0]:0+1",
-                    "[B@0]:0+2",
-                    "[C@0]:0+3",
-                    "[D@0]:0+4",
-                    "[A@0]:0+1+1",
-
-                    "[A@0]:0+1+1+1", "[A@5]:0+1",
-                    "[B@0]:0+2+2", "[B@5]:0+2",
-                    "[D@0]:0+4+4", "[D@5]:0+4",
-                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                    "[C@0]:0+3+3", "[C@5]:0+3",
-
-                    "[A@5]:0+1+1", "[A@10]:0+1",
-                    "[B@5]:0+2+2+2", "[B@10]:0+2",
-                    "[D@5]:0+4+4", "[D@10]:0+4",
-                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
-                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
-
-        } finally {
-            Utils.delete(baseDir);
-        }
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+        KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+                HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                strSerde,
+                strSerde);
+
+        MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        driver.setTime(0L);
+        driver.process(topic1, "A", "1");
+        driver.setTime(1L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(2L);
+        driver.process(topic1, "C", "3");
+        driver.setTime(3L);
+        driver.process(topic1, "D", "4");
+        driver.setTime(4L);
+        driver.process(topic1, "A", "1");
+
+        driver.setTime(5L);
+        driver.process(topic1, "A", "1");
+        driver.setTime(6L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(7L);
+        driver.process(topic1, "D", "4");
+        driver.setTime(8L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(9L);
+        driver.process(topic1, "C", "3");
+
+        driver.setTime(10L);
+        driver.process(topic1, "A", "1");
+        driver.setTime(11L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(12L);
+        driver.process(topic1, "D", "4");
+        driver.setTime(13L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(14L);
+        driver.process(topic1, "C", "3");
+
+        assertEquals(Utils.mkList(
+                "[A@0]:0+1",
+                "[B@0]:0+2",
+                "[C@0]:0+3",
+                "[D@0]:0+4",
+                "[A@0]:0+1+1",
+
+                "[A@0]:0+1+1+1", "[A@5]:0+1",
+                "[B@0]:0+2+2", "[B@5]:0+2",
+                "[D@0]:0+4+4", "[D@5]:0+4",
+                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                "[C@0]:0+3+3", "[C@5]:0+3",
+
+                "[A@5]:0+1+1", "[A@10]:0+1",
+                "[B@5]:0+2+2+2", "[B@10]:0+2",
+                "[D@5]:0+4+4", "[D@10]:0+4",
+                "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+                "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
     }
 
     @Test
     public void testJoin() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
-            KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
-                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                    strSerde,
-                    strSerde);
-
-            MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
-            table1.toStream().process(proc1);
-
-            KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
-            KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(),
-                    HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
-                    strSerde,
-                    strSerde);
-
-            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
-
-
-            MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
-            table1.join(table2, new ValueJoiner<String, String, String>() {
-                @Override
-                public String apply(String p1, String p2) {
-                    return p1 + "%" + p2;
-                }
-            }).toStream().process(proc3);
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-
-            driver.setTime(0L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(1L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(2L);
-            driver.process(topic1, "C", "3");
-            driver.setTime(3L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(4L);
-            driver.process(topic1, "A", "1");
-
-            proc1.checkAndClearProcessResult(
-                    "[A@0]:0+1",
-                    "[B@0]:0+2",
-                    "[C@0]:0+3",
-                    "[D@0]:0+4",
-                    "[A@0]:0+1+1"
-            );
-            proc2.checkAndClearProcessResult();
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:null",
-                    "[B@0]:null",
-                    "[C@0]:null",
-                    "[D@0]:null",
-                    "[A@0]:null"
-            );
-
-            driver.setTime(5L);
-            driver.process(topic1, "A", "1");
-            driver.setTime(6L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(7L);
-            driver.process(topic1, "D", "4");
-            driver.setTime(8L);
-            driver.process(topic1, "B", "2");
-            driver.setTime(9L);
-            driver.process(topic1, "C", "3");
-
-            proc1.checkAndClearProcessResult(
-                    "[A@0]:0+1+1+1", "[A@5]:0+1",
-                    "[B@0]:0+2+2", "[B@5]:0+2",
-                    "[D@0]:0+4+4", "[D@5]:0+4",
-                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                    "[C@0]:0+3+3", "[C@5]:0+3"
-            );
-            proc2.checkAndClearProcessResult();
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:null", "[A@5]:null",
-                    "[B@0]:null", "[B@5]:null",
-                    "[D@0]:null", "[D@5]:null",
-                    "[B@0]:null", "[B@5]:null",
-                    "[C@0]:null", "[C@5]:null"
-            );
-
-            driver.setTime(0L);
-            driver.process(topic2, "A", "a");
-            driver.setTime(1L);
-            driver.process(topic2, "B", "b");
-            driver.setTime(2L);
-            driver.process(topic2, "C", "c");
-            driver.setTime(3L);
-            driver.process(topic2, "D", "d");
-            driver.setTime(4L);
-            driver.process(topic2, "A", "a");
-
-            proc1.checkAndClearProcessResult();
-            proc2.checkAndClearProcessResult(
-                    "[A@0]:0+a",
-                    "[B@0]:0+b",
-                    "[C@0]:0+c",
-                    "[D@0]:0+d",
-                    "[A@0]:0+a+a"
-            );
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:0+1+1+1%0+a",
-                    "[B@0]:0+2+2+2%0+b",
-                    "[C@0]:0+3+3%0+c",
-                    "[D@0]:0+4+4%0+d",
-                    "[A@0]:0+1+1+1%0+a+a");
-
-            driver.setTime(5L);
-            driver.process(topic2, "A", "a");
-            driver.setTime(6L);
-            driver.process(topic2, "B", "b");
-            driver.setTime(7L);
-            driver.process(topic2, "D", "d");
-            driver.setTime(8L);
-            driver.process(topic2, "B", "b");
-            driver.setTime(9L);
-            driver.process(topic2, "C", "c");
-
-            proc1.checkAndClearProcessResult();
-            proc2.checkAndClearProcessResult(
-                    "[A@0]:0+a+a+a", "[A@5]:0+a",
-                    "[B@0]:0+b+b", "[B@5]:0+b",
-                    "[D@0]:0+d+d", "[D@5]:0+d",
-                    "[B@0]:0+b+b+b", "[B@5]:0+b+b",
-                    "[C@0]:0+c+c", "[C@5]:0+c"
-            );
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
-                    "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
-                    "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
-                    "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
-                    "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
-            );
-
-        } finally {
-            Utils.delete(baseDir);
-        }
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+        KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+                HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                strSerde,
+                strSerde);
+
+        MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+        table1.toStream().process(proc1);
+
+        KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+        KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+                HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
+                strSerde,
+                strSerde);
+
+        MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
+
+
+        MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+        table1.join(table2, new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(String p1, String p2) {
+                return p1 + "%" + p2;
+            }
+        }).toStream().process(proc3);
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        driver.setTime(0L);
+        driver.process(topic1, "A", "1");
+        driver.setTime(1L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(2L);
+        driver.process(topic1, "C", "3");
+        driver.setTime(3L);
+        driver.process(topic1, "D", "4");
+        driver.setTime(4L);
+        driver.process(topic1, "A", "1");
+
+        proc1.checkAndClearProcessResult(
+                "[A@0]:0+1",
+                "[B@0]:0+2",
+                "[C@0]:0+3",
+                "[D@0]:0+4",
+                "[A@0]:0+1+1"
+        );
+        proc2.checkAndClearProcessResult();
+        proc3.checkAndClearProcessResult(
+                "[A@0]:null",
+                "[B@0]:null",
+                "[C@0]:null",
+                "[D@0]:null",
+                "[A@0]:null"
+        );
+
+        driver.setTime(5L);
+        driver.process(topic1, "A", "1");
+        driver.setTime(6L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(7L);
+        driver.process(topic1, "D", "4");
+        driver.setTime(8L);
+        driver.process(topic1, "B", "2");
+        driver.setTime(9L);
+        driver.process(topic1, "C", "3");
+
+        proc1.checkAndClearProcessResult(
+                "[A@0]:0+1+1+1", "[A@5]:0+1",
+                "[B@0]:0+2+2", "[B@5]:0+2",
+                "[D@0]:0+4+4", "[D@5]:0+4",
+                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                "[C@0]:0+3+3", "[C@5]:0+3"
+        );
+        proc2.checkAndClearProcessResult();
+        proc3.checkAndClearProcessResult(
+                "[A@0]:null", "[A@5]:null",
+                "[B@0]:null", "[B@5]:null",
+                "[D@0]:null", "[D@5]:null",
+                "[B@0]:null", "[B@5]:null",
+                "[C@0]:null", "[C@5]:null"
+        );
+
+        driver.setTime(0L);
+        driver.process(topic2, "A", "a");
+        driver.setTime(1L);
+        driver.process(topic2, "B", "b");
+        driver.setTime(2L);
+        driver.process(topic2, "C", "c");
+        driver.setTime(3L);
+        driver.process(topic2, "D", "d");
+        driver.setTime(4L);
+        driver.process(topic2, "A", "a");
+
+        proc1.checkAndClearProcessResult();
+        proc2.checkAndClearProcessResult(
+                "[A@0]:0+a",
+                "[B@0]:0+b",
+                "[C@0]:0+c",
+                "[D@0]:0+d",
+                "[A@0]:0+a+a"
+        );
+        proc3.checkAndClearProcessResult(
+                "[A@0]:0+1+1+1%0+a",
+                "[B@0]:0+2+2+2%0+b",
+                "[C@0]:0+3+3%0+c",
+                "[D@0]:0+4+4%0+d",
+                "[A@0]:0+1+1+1%0+a+a");
+
+        driver.setTime(5L);
+        driver.process(topic2, "A", "a");
+        driver.setTime(6L);
+        driver.process(topic2, "B", "b");
+        driver.setTime(7L);
+        driver.process(topic2, "D", "d");
+        driver.setTime(8L);
+        driver.process(topic2, "B", "b");
+        driver.setTime(9L);
+        driver.process(topic2, "C", "c");
+
+        proc1.checkAndClearProcessResult();
+        proc2.checkAndClearProcessResult(
+                "[A@0]:0+a+a+a", "[A@5]:0+a",
+                "[B@0]:0+b+b", "[B@5]:0+b",
+                "[D@0]:0+d+d", "[D@5]:0+d",
+                "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+                "[C@0]:0+c+c", "[C@5]:0+c"
+        );
+        proc3.checkAndClearProcessResult(
+                "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+                "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+                "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+                "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+                "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+        );
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index be0ec19..a614479 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -27,61 +27,73 @@ import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 
 public class KTableAggregateTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    @Test
-    public void testAggBasic() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-            String topic1 = "topic1";
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
-            KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
-            KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
-                                                           stringSerde,
-                                                           stringSerde
-            ).aggregate(MockInitializer.STRING_INIT,
-                        MockAggregator.STRING_ADDER,
-                        MockAggregator.STRING_REMOVER,
-                        stringSerde,
-                        "topic1-Canonized");
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
-            MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-            table2.toStream().process(proc2);
+    @Test
+    public void testAggBasic() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        String topic1 = "topic1";
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+        KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
+                stringSerde,
+                stringSerde
+        ).aggregate(MockInitializer.STRING_INIT,
+                MockAggregator.STRING_ADDER,
+                MockAggregator.STRING_REMOVER,
+                stringSerde,
+                "topic1-Canonized");
 
-            driver.process(topic1, "A", "1");
-            driver.process(topic1, "B", "2");
-            driver.process(topic1, "A", "3");
-            driver.process(topic1, "B", "4");
-            driver.process(topic1, "C", "5");
-            driver.process(topic1, "D", "6");
-            driver.process(topic1, "B", "7");
-            driver.process(topic1, "C", "8");
+        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+        table2.toStream().process(proc2);
 
-            assertEquals(Utils.mkList(
-                    "A:0+1",
-                    "B:0+2",
-                    "A:0+1+3", "A:0+1+3-1",
-                    "B:0+2+4", "B:0+2+4-2",
-                    "C:0+5",
-                    "D:0+6",
-                    "B:0+2+4-2+7", "B:0+2+4-2+7-4",
-                    "C:0+5+8", "C:0+5+8-5"), proc2.processed);
+        driver = new KStreamTestDriver(builder, stateDir);
 
-        } finally {
-            Utils.delete(baseDir);
-        }
+        driver.process(topic1, "A", "1");
+        driver.process(topic1, "B", "2");
+        driver.process(topic1, "A", "3");
+        driver.process(topic1, "B", "4");
+        driver.process(topic1, "C", "5");
+        driver.process(topic1, "D", "6");
+        driver.process(topic1, "B", "7");
+        driver.process(topic1, "C", "8");
+
+        assertEquals(Utils.mkList(
+                "A:0+1",
+                "B:0+2",
+                "A:0+1+3", "A:0+1+3-1",
+                "B:0+2+4", "B:0+2+4-2",
+                "C:0+5",
+                "D:0+6",
+                "B:0+2+4-2+7", "B:0+2+4-2+7-4",
+                "C:0+5+8", "C:0+5+8-5"), proc2.processed);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index ee26058..a3af133 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,25 +19,42 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class KTableFilterTest {
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testKTable() {
@@ -65,7 +82,7 @@ public class KTableFilterTest {
         table2.toStream().process(proc2);
         table3.toStream().process(proc3);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);
@@ -80,199 +97,181 @@ public class KTableFilterTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-
-            KTableImpl<String, Integer, Integer> table1 =
-                    (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
-            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-
-            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
-            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-
-            getter2.init(driver.context());
-            getter3.init(driver.context());
-
-            driver.process(topic1, "A", 1);
-            driver.process(topic1, "B", 1);
-            driver.process(topic1, "C", 1);
-
-            assertNull(getter2.get("A"));
-            assertNull(getter2.get("B"));
-            assertNull(getter2.get("C"));
-
-            assertEquals(1, (int) getter3.get("A"));
-            assertEquals(1, (int) getter3.get("B"));
-            assertEquals(1, (int) getter3.get("C"));
-
-            driver.process(topic1, "A", 2);
-            driver.process(topic1, "B", 2);
-
-            assertEquals(2, (int) getter2.get("A"));
-            assertEquals(2, (int) getter2.get("B"));
-            assertNull(getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertEquals(1, (int) getter3.get("C"));
-
-            driver.process(topic1, "A", 3);
-
-            assertNull(getter2.get("A"));
-            assertEquals(2, (int) getter2.get("B"));
-            assertNull(getter2.get("C"));
-
-            assertEquals(3, (int) getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertEquals(1, (int) getter3.get("C"));
-
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
-
-            assertNull(getter2.get("A"));
-            assertNull(getter2.get("B"));
-            assertNull(getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertEquals(1, (int) getter3.get("C"));
-
-        } finally {
-            Utils.delete(stateDir);
-        }
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, Integer, Integer> table1 =
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+
+        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+
+        getter2.init(driver.context());
+        getter3.init(driver.context());
+
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 1);
+        driver.process(topic1, "C", 1);
+
+        assertNull(getter2.get("A"));
+        assertNull(getter2.get("B"));
+        assertNull(getter2.get("C"));
+
+        assertEquals(1, (int) getter3.get("A"));
+        assertEquals(1, (int) getter3.get("B"));
+        assertEquals(1, (int) getter3.get("C"));
+
+        driver.process(topic1, "A", 2);
+        driver.process(topic1, "B", 2);
+
+        assertEquals(2, (int) getter2.get("A"));
+        assertEquals(2, (int) getter2.get("B"));
+        assertNull(getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertNull(getter3.get("B"));
+        assertEquals(1, (int) getter3.get("C"));
+
+        driver.process(topic1, "A", 3);
+
+        assertNull(getter2.get("A"));
+        assertEquals(2, (int) getter2.get("B"));
+        assertNull(getter2.get("C"));
+
+        assertEquals(3, (int) getter3.get("A"));
+        assertNull(getter3.get("B"));
+        assertEquals(1, (int) getter3.get("C"));
+
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
+
+        assertNull(getter2.get("A"));
+        assertNull(getter2.get("B"));
+        assertNull(getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertNull(getter3.get("B"));
+        assertEquals(1, (int) getter3.get("C"));
     }
 
     @Test
     public void testNotSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-
-            KTableImpl<String, Integer, Integer> table1 =
-                    (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
-            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
+        KStreamBuilder builder = new KStreamBuilder();
 
-            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
-            MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        String topic1 = "topic1";
 
-            builder.addProcessor("proc1", proc1, table1.name);
-            builder.addProcessor("proc2", proc2, table2.name);
+        KTableImpl<String, Integer, Integer> table1 =
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc1", proc1, table1.name);
+        builder.addProcessor("proc2", proc2, table2.name);
 
-            driver.process(topic1, "A", 1);
-            driver.process(topic1, "B", 1);
-            driver.process(topic1, "C", 1);
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 1);
+        driver.process(topic1, "C", 1);
 
-            driver.process(topic1, "A", 2);
-            driver.process(topic1, "B", 2);
+        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
-            proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-            proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        driver.process(topic1, "A", 2);
+        driver.process(topic1, "B", 2);
 
-            driver.process(topic1, "A", 3);
+        proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
-            proc1.checkAndClearProcessResult("A:(3<-null)");
-            proc2.checkAndClearProcessResult("A:(null<-null)");
+        driver.process(topic1, "A", 3);
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        proc1.checkAndClearProcessResult("A:(3<-null)");
+        proc2.checkAndClearProcessResult("A:(null<-null)");
 
-            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
-            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
+        KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, Integer, Integer> table1 =
-                    (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
-            KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
+        String topic1 = "topic1";
 
-            table2.enableSendingOldValues();
+        KTableImpl<String, Integer, Integer> table1 =
+                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
+        KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
 
-            MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
-            MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        table2.enableSendingOldValues();
 
-            builder.addProcessor("proc1", proc1, table1.name);
-            builder.addProcessor("proc2", proc2, table2.name);
+        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
+        builder.addProcessor("proc1", proc1, table1.name);
+        builder.addProcessor("proc2", proc2, table2.name);
 
-            driver.process(topic1, "A", 1);
-            driver.process(topic1, "B", 1);
-            driver.process(topic1, "C", 1);
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
 
-            proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+        driver.process(topic1, "A", 1);
+        driver.process(topic1, "B", 1);
+        driver.process(topic1, "C", 1);
 
-            driver.process(topic1, "A", 2);
-            driver.process(topic1, "B", 2);
+        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
-            proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
-            proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        driver.process(topic1, "A", 2);
+        driver.process(topic1, "B", 2);
 
-            driver.process(topic1, "A", 3);
+        proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
-            proc1.checkAndClearProcessResult("A:(3<-2)");
-            proc2.checkAndClearProcessResult("A:(null<-2)");
+        driver.process(topic1, "A", 3);
 
-            driver.process(topic1, "A", null);
-            driver.process(topic1, "B", null);
+        proc1.checkAndClearProcessResult("A:(3<-2)");
+        proc2.checkAndClearProcessResult("A:(null<-2)");
 
-            proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
-            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "B", null);
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 27a5114..af131c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
 import org.junit.Test;
 import java.util.List;
 import java.util.Locale;
@@ -39,6 +40,16 @@ public class KTableForeachTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
+    private KStreamTestDriver driver;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testForeach() {
         // Given
@@ -71,7 +82,7 @@ public class KTableForeachTest {
         table.foreach(action);
 
         // Then
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 8a13e9a..ca3bbe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -33,11 +33,13 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -46,7 +48,23 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableImplTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testKTable() {
@@ -85,7 +103,7 @@ public class KTableImplTest {
         MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
         table4.toStream().process(proc4);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "02");
@@ -100,129 +118,157 @@ public class KTableImplTest {
 
     @Test
     public void testValueGetter() throws IOException {
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            final KStreamBuilder builder = new KStreamBuilder();
-
-            String topic1 = "topic1";
-            String topic2 = "topic2";
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-            KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                    table1.through(stringSerde, stringSerde, topic2);
-
-            KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
-            KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
-            KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-
-            // two state store should be created
-            assertEquals(2, driver.allStateStores().size());
-
-            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
-            getter1.init(driver.context());
-            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-            getter2.init(driver.context());
-            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
-            getter3.init(driver.context());
-            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
-            getter4.init(driver.context());
-
-            driver.process(topic1, "A", "01");
-            driver.process(topic1, "B", "01");
-            driver.process(topic1, "C", "01");
-
-            assertEquals("01", getter1.get("A"));
-            assertEquals("01", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(1), getter2.get("A"));
-            assertEquals(new Integer(1), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertNull(getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("01", getter4.get("A"));
-            assertEquals("01", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "02");
-            driver.process(topic1, "B", "02");
-
-            assertEquals("02", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(2), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertEquals(new Integer(2), getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("02", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", "03");
-
-            assertEquals("03", getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertEquals(new Integer(3), getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertEquals("03", getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
-
-            driver.process(topic1, "A", null);
-
-            assertNull(getter1.get("A"));
-            assertEquals("02", getter1.get("B"));
-            assertEquals("01", getter1.get("C"));
-
-            assertNull(getter2.get("A"));
-            assertEquals(new Integer(2), getter2.get("B"));
-            assertEquals(new Integer(1), getter2.get("C"));
-
-            assertNull(getter3.get("A"));
-            assertEquals(new Integer(2), getter3.get("B"));
-            assertNull(getter3.get("C"));
-
-            assertNull(getter4.get("A"));
-            assertEquals("02", getter4.get("B"));
-            assertEquals("01", getter4.get("C"));
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+        KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
+                table1.through(stringSerde, stringSerde, topic2);
+
+        KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
+        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
+        KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
+
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
+
+        // two state store should be created
+        assertEquals(2, driver.allStateStores().size());
+
+        KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+        getter1.init(driver.context());
+        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+        getter2.init(driver.context());
+        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        getter3.init(driver.context());
+        KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+        getter4.init(driver.context());
+
+        driver.process(topic1, "A", "01");
+        driver.process(topic1, "B", "01");
+        driver.process(topic1, "C", "01");
+
+        assertEquals("01", getter1.get("A"));
+        assertEquals("01", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(1), getter2.get("A"));
+        assertEquals(new Integer(1), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertNull(getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("01", getter4.get("A"));
+        assertEquals("01", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", "02");
+        driver.process(topic1, "B", "02");
+
+        assertEquals("02", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(2), getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertEquals(new Integer(2), getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("02", getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", "03");
+
+        assertEquals("03", getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertEquals(new Integer(3), getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertEquals("03", getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+
+        driver.process(topic1, "A", null);
+
+        assertNull(getter1.get("A"));
+        assertEquals("02", getter1.get("B"));
+        assertEquals("01", getter1.get("C"));
+
+        assertNull(getter2.get("A"));
+        assertEquals(new Integer(2), getter2.get("B"));
+        assertEquals(new Integer(1), getter2.get("C"));
+
+        assertNull(getter3.get("A"));
+        assertEquals(new Integer(2), getter3.get("B"));
+        assertNull(getter3.get("C"));
+
+        assertNull(getter4.get("A"));
+        assertEquals("02", getter4.get("B"));
+        assertEquals("01", getter4.get("C"));
+    }
+
+    @Test
+    public void testStateStoreLazyEval() throws IOException {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table2 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
+
+        KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setTime(0L);
+
+        // no state store should be created
+        assertEquals(0, driver.allStateStores().size());
     }
 
     @Test
@@ -230,120 +276,75 @@ public class KTableImplTest {
         String topic1 = "topic1";
         String topic2 = "topic2";
 
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            KStreamBuilder builder = new KStreamBuilder();
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, String> table2 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
-
-            KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-            driver.setTime(0L);
-
-            // no state store should be created
-            assertEquals(0, driver.allStateStores().size());
-
-        } finally {
-            Utils.delete(stateDir);
-        }
+        final KStreamBuilder builder = new KStreamBuilder();
 
-        try {
-            KStreamBuilder builder = new KStreamBuilder();
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
-            KTableImpl<String, String, String> table2 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
-
-            KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
-                    new ValueMapper<String, Integer>() {
-                        @Override
-                        public Integer apply(String value) {
-                            return new Integer(value);
-                        }
-                    });
-            KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
-                    new Predicate<String, Integer>() {
-                        @Override
-                        public boolean test(String key, Integer value) {
-                            return (value % 2) == 0;
-                        }
-                    });
-            table2.join(table1MappedFiltered,
-                    new ValueJoiner<String, Integer, String>() {
-                        @Override
-                        public String apply(String v1, Integer v2) {
-                            return v1 + v2;
-                        }
-                    });
-
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
-            driver.setTime(0L);
-
-            // two state store should be created
-            assertEquals(2, driver.allStateStores().size());
-
-        } finally {
-            Utils.delete(stateDir);
-        }
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        KTableImpl<String, String, String> table2 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
+
+        KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
+                new ValueMapper<String, Integer>() {
+                    @Override
+                    public Integer apply(String value) {
+                        return new Integer(value);
+                    }
+                });
+        KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+                new Predicate<String, Integer>() {
+                    @Override
+                    public boolean test(String key, Integer value) {
+                        return (value % 2) == 0;
+                    }
+                });
+        table2.join(table1MappedFiltered,
+                new ValueJoiner<String, Integer, String>() {
+                    @Override
+                    public String apply(String v1, Integer v2) {
+                        return v1 + v2;
+                    }
+                });
+
+        driver = new KStreamTestDriver(builder, stateDir, null, null);
+        driver.setTime(0L);
+
+        // two state store should be created
+        assertEquals(2, driver.allStateStores().size());
     }
 
     @Test
     public void testRepartition() throws IOException {
         String topic1 = "topic1";
 
-        File stateDir = Files.createTempDirectory("test").toFile();
-        try {
-            KStreamBuilder builder = new KStreamBuilder();
-
-            KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
-                    .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
-                    .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
+        KTableImpl<String, String, String> table1 =
+                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
+        KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
+                .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
 
-            KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
-                    .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
-                    .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
-            driver.setTime(0L);
+        KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
+                .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+                .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
 
-            // three state store should be created, one for source, one for aggregate and one for reduce
-            assertEquals(3, driver.allStateStores().size());
+        driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+        driver.setTime(0L);
 
-            // contains the corresponding repartition source / sink nodes
-            assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
-            assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
-            assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
-            assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+        // three state store should be created, one for source, one for aggregate and one for reduce
+        assertEquals(3, driver.allStateStores().size());
 
-            assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
-            assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
-            assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
-            assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+        // contains the corresponding repartition source / sink nodes
+        assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
+        assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
+        assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
+        assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
 
-        } finally {
-            Utils.delete(stateDir);
-        }
+        assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
+        assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
+        assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
+        assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
     }
 }


[2/4] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index f6ebbe1..16015fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -42,309 +44,291 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableKTableJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    private static class JoinedKeyValue extends KeyValue<Integer, String> {
-        public JoinedKeyValue(Integer key, String value) {
-            super(key, value);
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            processor = new MockProcessorSupplier<>();
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.join(table2, joiner);
-            joined.toStream().process(processor);
+        processor = new MockProcessorSupplier<>();
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
+        joined.toStream().process(processor);
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            KTableValueGetter<Integer, String> getter = getterSupplier.get();
-            getter.init(driver.context());
+        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:null", "1:null");
-            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items to the other stream. this should produce two items.
+        processor.checkAndClearProcessResult("0:null", "1:null");
+        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        // push two items to the other stream. this should produce two items.
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            // push all four items to the primary stream. this should produce four items.
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push all four items to the primary stream. this should produce four items.
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            // push all four items to the primary stream. this should produce four items.
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push all four items to the primary stream. this should produce four items.
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            processor.checkAndClearProcessResult("0:null", "1:null");
-            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            // push all four items to the primary stream. this should produce four items.
+        processor.checkAndClearProcessResult("0:null", "1:null");
+        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        // push all four items to the primary stream. this should produce four items.
 
-            processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
-            checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
-
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
     public void testNotSendingOldValues() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.join(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
-
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
     @Test
     public void testSendingOldValues() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.join(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
 
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-            // push two items to the primary stream. the other table is empty
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
 
-            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
-    private JoinedKeyValue kv(Integer key, String value) {
-        return new JoinedKeyValue(key, value);
+    private KeyValue<Integer, String> kv(Integer key, String value) {
+        return new KeyValue<>(key, value);
     }
 
-    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
-        for (JoinedKeyValue kv : expected) {
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, KeyValue<Integer, String>... expected) {
+        for (KeyValue<Integer, String> kv : expected) {
             String value = getter.get(kv.key);
             if (kv.value == null) {
                 assertNull(value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 449ea05..5132ce3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -19,18 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -43,313 +44,287 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableKTableLeftJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
-
-    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-            @Override
-            public KeyValue<Integer, String> apply(Integer key, String value) {
-                return KeyValue.pair(key, value);
-            }
-        };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    private static class JoinedKeyValue extends KeyValue<Integer, String> {
-        public JoinedKeyValue(Integer key, String value) {
-            super(key, value);
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
-            KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
-            KTable<Integer, String> joined = table1.leftJoin(table2, joiner);
-            MockProcessorSupplier<Integer, String> processor;
-            processor = new MockProcessorSupplier<>();
-            joined.toStream().process(processor);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
+        KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
+        KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
+        MockProcessorSupplier<Integer, String> processor;
+        processor = new MockProcessorSupplier<>();
+        joined.toStream().process(processor);
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-            KTableValueGetter<Integer, String> getter = getterSupplier.get();
-            getter.init(driver.context());
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            // push two items to the primary stream. the other table is empty
+        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
-            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
     public void testNotSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.leftJoin(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            // push two items to the primary stream. the other table is empty
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push two items to the primary stream. the other table is empty
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
-
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.leftJoin(table2, joiner);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            // push two items to the primary stream. the other table is empty
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
-    private JoinedKeyValue kv(Integer key, String value) {
-        return new JoinedKeyValue(key, value);
+    private KeyValue<Integer, String> kv(Integer key, String value) {
+        return new KeyValue<>(key, value);
     }
 
-    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
-        for (JoinedKeyValue kv : expected) {
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, KeyValue<Integer, String>... expected) {
+        for (KeyValue<Integer, String> kv : expected) {
             String value = getter.get(kv.key);
             if (kv.value == null) {
                 assertNull(value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index ea7476a..3124556 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -42,334 +44,316 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableKTableOuterJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    private static class JoinedKeyValue extends KeyValue<Integer, String> {
-        public JoinedKeyValue(Integer key, String value) {
-            super(key, value);
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            processor = new MockProcessorSupplier<>();
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.outerJoin(table2, joiner);
-            joined.toStream().process(processor);
+        processor = new MockProcessorSupplier<>();
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined.toStream().process(processor);
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
+        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            KTableValueGetter<Integer, String> getter = getterSupplier.get();
-            getter.init(driver.context());
+        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        getter.init(driver.context());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items to the other stream. this should produce two items.
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        // push two items to the other stream. this should produce two items.
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
-
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
-            checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-            checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-            checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
-            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        // push all four items to the primary stream. this should produce four items.
 
-            // push middle two items to the primary stream with null. this should produce two items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 1; i < 3; i++) {
-                driver.process(topic1, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
-            processor.checkAndClearProcessResult("1:null", "2:null+YY2");
-            checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
+        // push middle two items to the primary stream with null. this should produce two items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 1; i < 3; i++) {
+            driver.process(topic1, expectedKeys[i], null);
         }
+
+        processor.checkAndClearProcessResult("1:null", "2:null+YY2");
+        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
     public void testNotSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.outerJoin(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            // push two items to the primary stream. the other table is empty
+        // push two items to the primary stream. the other table is empty
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            // push two items to the other stream. this should produce two items.
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        // push two items to the other stream. this should produce two items.
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
-
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push middle two items to the primary stream with null. this should produce two items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 1; i < 3; i++) {
-                driver.process(topic1, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
-            proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
+        // push middle two items to the primary stream with null. this should produce two items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 1; i < 3; i++) {
+            driver.process(topic1, expectedKeys[i], null);
         }
+
+        proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
     }
 
     @Test
     public void testSendingOldValue() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> proc;
+        KTable<Integer, String> table1;
+        KTable<Integer, String> table2;
+        KTable<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(intSerde, stringSerde, topic1);
-            table2 = builder.table(intSerde, stringSerde, topic2);
-            joined = table1.outerJoin(table2, joiner);
+        table1 = builder.table(intSerde, stringSerde, topic1);
+        table2 = builder.table(intSerde, stringSerde, topic2);
+        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
 
-            ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+        ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-            proc = new MockProcessorSupplier<>();
-            builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        proc = new MockProcessorSupplier<>();
+        builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-            assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-            // push two items to the primary stream. the other table is empty
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
-            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should produce two items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+        // push two items to the other stream. this should produce two items.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should produce four items.
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should produce four items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
-            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should produce two item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+        // push two items with null to the other stream as deletes. this should produce two item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
-            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push middle two items to the primary stream with null. this should produce two items.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 1; i < 3; i++) {
-                driver.process(topic1, expectedKeys[i], null);
-            }
+        proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
-            proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
+        // push middle two items to the primary stream with null. this should produce two items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 1; i < 3; i++) {
+            driver.process(topic1, expectedKeys[i], null);
         }
+
+        proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
     }
 
-    private JoinedKeyValue kv(Integer key, String value) {
-        return new JoinedKeyValue(key, value);
+    private KeyValue<Integer, String> kv(Integer key, String value) {
+        return new KeyValue<>(key, value);
     }
 
-    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, JoinedKeyValue... expected) {
-        for (JoinedKeyValue kv : expected) {
+    private void checkJoinedValues(KTableValueGetter<Integer, String> getter, KeyValue<Integer, String>... expected) {
+        for (KeyValue<Integer, String> kv : expected) {
             String value = getter.get(kv.key);
             if (kv.value == null) {
                 assertNull(value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index ce1b9d6..cf74017 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -38,6 +39,16 @@ public class KTableMapKeysTest {
     final private Serde<String> stringSerde = new Serdes.StringSerde();
     final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testMapKeysConvertingToStream() {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -70,7 +81,7 @@ public class KTableMapKeysTest {
 
         convertedStream.process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
 
         for (int i = 0;  i < originalKeys.length; i++) {
             driver.process(topic1, originalKeys[i], values[i]);


[4/4] kafka git commit: KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623

Posted by gu...@apache.org.
KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Eno Thereska, Michael G. Noll, Ismael Juma

Closes #1258 from guozhangwang/K3607


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a73629b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a73629b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a73629b

Branch: refs/heads/trunk
Commit: 1a73629bb43bbc781e5a968a61f6079365bc75b7
Parents: f60a3fa
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Apr 26 11:39:49 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 26 11:39:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   |  37 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   9 +-
 .../streams/kstream/KStreamBuilderTest.java     |  13 +-
 .../internals/KGroupedTableImplTest.java        |  11 +-
 .../kstream/internals/KStreamBranchTest.java    |  13 +-
 .../kstream/internals/KStreamFilterTest.java    |  15 +-
 .../kstream/internals/KStreamFlatMapTest.java   |  13 +-
 .../internals/KStreamFlatMapValuesTest.java     |  13 +-
 .../kstream/internals/KStreamForeachTest.java   |  13 +-
 .../internals/KStreamKStreamJoinTest.java       | 651 +++++++++----------
 .../internals/KStreamKStreamLeftJoinTest.java   | 341 +++++-----
 .../internals/KStreamKTableLeftJoinTest.java    | 153 +++--
 .../kstream/internals/KStreamMapTest.java       |  13 +-
 .../kstream/internals/KStreamMapValuesTest.java |  13 +-
 .../kstream/internals/KStreamSelectKeyTest.java |  13 +-
 .../kstream/internals/KStreamTransformTest.java |  13 +-
 .../internals/KStreamTransformValuesTest.java   |  13 +-
 .../internals/KStreamWindowAggregateTest.java   | 455 +++++++------
 .../kstream/internals/KTableAggregateTest.java  |  92 +--
 .../kstream/internals/KTableFilterTest.java     | 327 +++++-----
 .../kstream/internals/KTableForeachTest.java    |  13 +-
 .../kstream/internals/KTableImplTest.java       | 451 ++++++-------
 .../kstream/internals/KTableKTableJoinTest.java | 394 ++++++-----
 .../internals/KTableKTableLeftJoinTest.java     | 397 ++++++-----
 .../internals/KTableKTableOuterJoinTest.java    | 426 ++++++------
 .../kstream/internals/KTableMapKeysTest.java    |  13 +-
 .../kstream/internals/KTableMapValuesTest.java  | 381 ++++++-----
 .../kstream/internals/KTableSourceTest.java     | 172 ++---
 .../internals/KeyValuePrinterProcessorTest.java |  15 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  28 +-
 .../apache/kafka/test/MockKeyValueMapper.java   |   2 +-
 .../org/apache/kafka/test/MockValueJoiner.java  |  33 +
 32 files changed, 2349 insertions(+), 2197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 027221e..1bfe578 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -20,6 +20,8 @@ import static java.util.Arrays.asList;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -29,6 +31,7 @@ import java.util.Random;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
 
 
 /**
@@ -97,12 +100,44 @@ public class TestUtils {
     }
 
     /**
-     * Creates an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
+     * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
      * suffix to generate its name.
      */
     public static File tempFile() throws IOException {
         File file = File.createTempFile("kafka", ".tmp");
         file.deleteOnExit();
+
+        return file;
+    }
+
+    /**
+     * Create a temporary relative directory in the default temporary-file directory with the given prefix.
+     *
+     * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
+     */
+    public static File tempDirectory(String prefix) throws IOException {
+        return tempDirectory(null, prefix);
+    }
+
+    /**
+     * Create a temporary relative directory in the specified parent directory with the given prefix.
+     *
+     * @param parent The parent folder path name, if null using the default temporary-file directory
+     * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
+     */
+    public static File tempDirectory(Path parent, String prefix) throws IOException {
+        final File file = parent == null ?
+                Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() :
+                Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile();
+        file.deleteOnExit();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                Utils.delete(file);
+            }
+        });
+
         return file;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index be7741d..6bd6c63 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -95,15 +95,8 @@ object TestUtils extends Logging {
   def tempRelativeDir(parent: String): File = {
     val parentFile = new File(parent)
     parentFile.mkdirs()
-    val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile
-    f.deleteOnExit()
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run() = {
-        Utils.delete(f)
-      }
-    })
-    f
+    org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index e75b595..cdf28db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -22,12 +22,23 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
 public class KStreamBuilderTest {
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test(expected = TopologyBuilderException.class)
     public void testFrom() {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -66,7 +77,7 @@ public class KStreamBuilderTest {
         MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         driver.setTime(0L);
 
         driver.process(topic1, "A", "aa");

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 9eeea20..fc0451a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -18,19 +18,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.After;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
 
@@ -43,12 +41,7 @@ public class KGroupedTableImplTest {
 
     @Before
     public void setUp() throws IOException {
-        stateDir = Files.createTempDirectory("test").toFile();
-    }
-
-    @After
-    public void tearDown() throws IOException {
-        Utils.delete(stateDir);
+        stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index e04a273..0650b95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import java.lang.reflect.Array;
@@ -33,6 +34,16 @@ public class KStreamBranchTest {
 
     private String topicName = "topic";
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testKStreamBranch() {
@@ -74,7 +85,7 @@ public class KStreamBranchTest {
             branches[i].process(processors[i]);
         }
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 75465c8..4be8513 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -32,6 +33,16 @@ public class KStreamFilterTest {
 
     private String topicName = "topic";
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
         public boolean test(Integer key, String value) {
@@ -51,7 +62,7 @@ public class KStreamFilterTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.filter(isMultipleOfThree).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
         }
@@ -71,7 +82,7 @@ public class KStreamFilterTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.filterNot(isMultipleOfThree).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index bc85757..da57d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -34,6 +35,16 @@ public class KStreamFlatMapTest {
 
     private String topicName = "topic";
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testFlatMap() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -59,7 +70,7 @@ public class KStreamFlatMapTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.flatMap(mapper).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 63f5636..9d1141b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -34,6 +35,16 @@ public class KStreamFlatMapValuesTest {
 
     private String topicName = "topic";
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testFlatMapValues() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -58,7 +69,7 @@ public class KStreamFlatMapValuesTest {
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.flatMapValues(mapper).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index d0a182d..0bc5e77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
 import org.junit.Test;
 import java.util.List;
 import java.util.Locale;
@@ -39,6 +40,16 @@ public class KStreamForeachTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testForeach() {
         // Given
@@ -71,7 +82,7 @@ public class KStreamForeachTest {
         stream.foreach(action);
 
         // Then
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 19a9411..6b0828a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -39,460 +41,447 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamKStreamJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
-    };
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-            KStream<Integer, String> stream1;
-            KStream<Integer, String> stream2;
-            KStream<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        KStreamBuilder builder = new KStreamBuilder();
 
-            processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(intSerde, stringSerde, topic1);
-            stream2 = builder.stream(intSerde, stringSerde, topic2);
-            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
-            joined.process(processor);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+        joined.process(processor);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            // push two items to the primary stream. the other window is empty
-            // w1 = {}
-            // w2 = {}
-            // --> w1 = { 0:X1, 1:X1 }
-            //     w2 = {}
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            processor.checkAndClearProcessResult();
+        // push two items to the primary stream. the other window is empty
+        // w1 = {}
+        // w2 = {}
+        // --> w1 = { 0:X1, 1:X1 }
+        //     w2 = {}
 
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = {}
-            // --> w1 = { 0:X1, 1:X1 }
-            //     w2 = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        // push two items to the other stream. this should produce two items.
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = {}
+        // --> w1 = { 0:X1, 1:X1 }
+        //     w2 = { 0:Y0, 1:Y1 }
 
-            // push all four items to the primary stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        // push all four items to the primary stream. this should produce two items.
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = { 0:Y0, 1:Y1 }
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        //     w2 = { 0:Y0, 1:Y1 }
 
-            // push all items to the other stream. this should produce six items.
-            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        // push all items to the other stream. this should produce six items.
+        // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        // w2 = { 0:Y0, 1:Y1 }
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-            // push all four items to the primary stream. this should produce six items.
-            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+        // push all four items to the primary stream. this should produce six items.
+        // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-            // push two items to the other stream. this should produce six item.
-            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
-            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+        // push two items to the other stream. this should produce six item.
+        // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
     }
 
     @Test
     public void testOuterJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
+        KStreamBuilder builder = new KStreamBuilder();
 
-            KStreamBuilder builder = new KStreamBuilder();
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            KStream<Integer, String> stream1;
-            KStream<Integer, String> stream2;
-            KStream<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+        joined.process(processor);
 
-            processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(intSerde, stringSerde, topic1);
-            stream2 = builder.stream(intSerde, stringSerde, topic2);
-            joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
-            joined.process(processor);
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        // push two items to the primary stream. the other window is empty.this should produce two items
+        // w1 = {}
+        // w2 = {}
+        // --> w1 = { 0:X1, 1:X1 }
+        //     w2 = {}
 
-            // push two items to the primary stream. the other window is empty.this should produce two items
-            // w1 = {}
-            // w2 = {}
-            // --> w1 = { 0:X1, 1:X1 }
-            //     w2 = {}
-
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = {}
-            // --> w1 = { 0:X1, 1:X1 }
-            //     w2 = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        // push two items to the other stream. this should produce two items.
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = {}
+        // --> w1 = { 0:X1, 1:X1 }
+        //     w2 = { 0:Y0, 1:Y1 }
 
-            // push all four items to the primary stream. this should produce four items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        // push all four items to the primary stream. this should produce four items.
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = { 0:Y0, 1:Y1 }
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        //     w2 = { 0:Y0, 1:Y1 }
 
-            // push all items to the other stream. this should produce six items.
-            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1 }
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        // push all items to the other stream. this should produce six items.
+        // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        // w2 = { 0:Y0, 1:Y1 }
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-            // push all four items to the primary stream. this should produce six items.
-            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+        // push all four items to the primary stream. this should produce six items.
+        // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-            // push two items to the other stream. this should produce six item.
-            // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-            // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
-            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+        // push two items to the other stream. this should produce six item.
+        // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+        // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
     }
 
     @Test
     public void testWindowing() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            long time = 0L;
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        long time = 0L;
 
-            KStream<Integer, String> stream1;
-            KStream<Integer, String> stream2;
-            KStream<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        KStreamBuilder builder = new KStreamBuilder();
 
-            processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(intSerde, stringSerde, topic1);
-            stream2 = builder.stream(intSerde, stringSerde, topic2);
-            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
-            joined.process(processor);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+        joined.process(processor);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(time);
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            // push two items to the primary stream. the other window is empty. this should produce no items.
-            // w1 = {}
-            // w2 = {}
-            // --> w1 = { 0:X1, 1:X1 }
-            //     w2 = {}
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(time);
 
-            processor.checkAndClearProcessResult();
+        // push two items to the primary stream. the other window is empty. this should produce no items.
+        // w1 = {}
+        // w2 = {}
+        // --> w1 = { 0:X1, 1:X1 }
+        //     w2 = {}
 
-            // push two items to the other stream. this should produce two items.
-            // w1 = { 0:X0, 1:X1 }
-            // w2 = {}
-            // --> w1 = { 0:X1, 1:X1 }
-            //     w2 = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+        // push two items to the other stream. this should produce two items.
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = {}
+        // --> w1 = { 0:X1, 1:X1 }
+        //     w2 = { 0:Y0, 1:Y1 }
 
-            // clear logically
-            time = 1000L;
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.setTime(time + i);
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
-            processor.checkAndClearProcessResult();
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-            // gradually expires items in w1
-            // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+        // clear logically
+        time = 1000L;
 
-            time = 1000 + 100L;
-            driver.setTime(time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.setTime(time + i);
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+        processor.checkAndClearProcessResult();
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        // gradually expires items in w1
+        // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        time = 1000 + 100L;
+        driver.setTime(time);
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("3:X3+YY3");
+        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult();
+        processor.checkAndClearProcessResult("3:X3+YY3");
 
-            // go back to the time before expiration
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            time = 1000L - 100L - 1L;
-            driver.setTime(time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult();
+        // go back to the time before expiration
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        time = 1000L - 100L - 1L;
+        driver.setTime(time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0");
+        processor.checkAndClearProcessResult();
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+        processor.checkAndClearProcessResult("0:X0+YY0");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-            // clear (logically)
-            time = 2000L;
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.setTime(time + i);
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-            processor.checkAndClearProcessResult();
+        // clear (logically)
+        time = 2000L;
 
-            // gradually expires items in w2
-            // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.setTime(time + i);
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            time = 2000L + 100L;
-            driver.setTime(time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        // gradually expires items in w2
+        // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        time = 2000L + 100L;
+        driver.setTime(time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("3:XX3+Y3");
+        processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult();
+        processor.checkAndClearProcessResult("3:XX3+Y3");
 
-            // go back to the time before expiration
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            time = 2000L - 100L - 1L;
-            driver.setTime(time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult();
+        // go back to the time before expiration
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        time = 2000L - 100L - 1L;
+        driver.setTime(time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0");
+        processor.checkAndClearProcessResult();
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+        processor.checkAndClearProcessResult("0:XX0+Y0");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
-        } finally {
-            Utils.delete(baseDir);
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-    }
 
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 65226d3..65a4b54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -39,245 +41,240 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamKStreamLeftJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
 
-    @Test
-    public void testLeftJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
 
-            KStreamBuilder builder = new KStreamBuilder();
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KStream<Integer, String> stream1;
-            KStream<Integer, String> stream2;
-            KStream<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+    @Test
+    public void testLeftJoin() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(intSerde, stringSerde, topic1);
-            stream2 = builder.stream(intSerde, stringSerde, topic2);
-            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
-            joined.process(processor);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+        joined.process(processor);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            // push two items to the primary stream. the other window is empty
-            // w {}
-            // --> w = {}
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        // push two items to the primary stream. the other window is empty
+        // w {}
+        // --> w = {}
 
-            // push two items to the other stream. this should produce two items.
-            // w {}
-            // --> w = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
-            processor.checkAndClearProcessResult();
+        // push two items to the other stream. this should produce two items.
+        // w {}
+        // --> w = { 0:Y0, 1:Y1 }
 
-            // push all four items to the primary stream. this should produce four items.
-            // w = { 0:Y0, 1:Y1 }
-            // --> w = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        // push all four items to the primary stream. this should produce four items.
+        // w = { 0:Y0, 1:Y1 }
+        // --> w = { 0:Y0, 1:Y1 }
 
-            // push all items to the other stream. this should produce no items.
-            // w = { 0:Y0, 1:Y1 }
-            // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
-            processor.checkAndClearProcessResult();
+        // push all items to the other stream. this should produce no items.
+        // w = { 0:Y0, 1:Y1 }
+        // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-            // push all four items to the primary stream. this should produce four items.
-            // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-            // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+        // push all four items to the primary stream. this should produce four items.
+        // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+        // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
     }
 
     @Test
     public void testWindowing() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            long time = 0L;
-
-            KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
 
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KStream<Integer, String> stream1;
-            KStream<Integer, String> stream2;
-            KStream<Integer, String> joined;
-            MockProcessorSupplier<Integer, String> processor;
+        long time = 0L;
 
-            processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(intSerde, stringSerde, topic1);
-            stream2 = builder.stream(intSerde, stringSerde, topic2);
-            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
-            joined.process(processor);
+        KStream<Integer, String> stream1;
+        KStream<Integer, String> stream2;
+        KStream<Integer, String> joined;
+        MockProcessorSupplier<Integer, String> processor;
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        processor = new MockProcessorSupplier<>();
+        stream1 = builder.stream(intSerde, stringSerde, topic1);
+        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+        joined.process(processor);
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(time);
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            // push two items to the primary stream. the other window is empty. this should produce two items
-            // w = {}
-            // --> w = {}
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(time);
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        // push two items to the primary stream. the other window is empty. this should produce two items
+        // w = {}
+        // --> w = {}
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
-            // push two items to the other stream. this should produce no items.
-            // w = {}
-            // --> w = { 0:Y0, 1:Y1 }
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
-            processor.checkAndClearProcessResult();
+        // push two items to the other stream. this should produce no items.
+        // w = {}
+        // --> w = { 0:Y0, 1:Y1 }
 
-            // clear logically
-            time = 1000L;
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            // push all items to the other stream. this should produce no items.
-            // w = {}
-            // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.setTime(time + i);
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult();
+        // clear logically
+        time = 1000L;
 
-            // gradually expire items in window.
-            // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+        // push all items to the other stream. this should produce no items.
+        // w = {}
+        // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.setTime(time + i);
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            time = 1000L + 100L;
-            driver.setTime(time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        // gradually expire items in window.
+        // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        time = 1000L + 100L;
+        driver.setTime(time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
 
-            // go back to the time before expiration
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            time = 1000L - 100L - 1L;
-            driver.setTime(time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+        // go back to the time before expiration
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        time = 1000L - 100L - 1L;
+        driver.setTime(time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
 
-            driver.setTime(++time);
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
 
-        } finally {
-            Utils.delete(baseDir);
+        driver.setTime(++time);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-    }
 
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 3acb59a..2c6108b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -20,19 +20,20 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -42,111 +43,105 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableLeftJoinTest {
 
-    private String topic1 = "topic1";
-    private String topic2 = "topic2";
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
-    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
         }
-    };
+        driver = null;
+    }
 
-    private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper =
-        new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
-            @Override
-            public KeyValue<Integer, String> apply(Integer key, String value) {
-                return KeyValue.pair(key, value);
-            }
-        };
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
 
     @Test
     public void testJoin() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-
-            KStreamBuilder builder = new KStreamBuilder();
-
-            final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-            KStream<Integer, String> stream;
-            KTable<Integer, String> table;
-            MockProcessorSupplier<Integer, String> processor;
+        KStreamBuilder builder = new KStreamBuilder();
 
-            processor = new MockProcessorSupplier<>();
-            stream = builder.stream(intSerde, stringSerde, topic1);
-            table = builder.table(intSerde, stringSerde, topic2);
-            stream.leftJoin(table, joiner).process(processor);
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        KStream<Integer, String> stream;
+        KTable<Integer, String> table;
+        MockProcessorSupplier<Integer, String> processor;
 
-            assertEquals(1, copartitionGroups.size());
-            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+        processor = new MockProcessorSupplier<>();
+        stream = builder.stream(intSerde, stringSerde, topic1);
+        table = builder.table(intSerde, stringSerde, topic2);
+        stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
-            driver.setTime(0L);
+        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-            // push two items to the primary stream. the other table is empty
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
 
-            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+        // push two items to the primary stream. the other table is empty
 
-            // push two items to the other stream. this should not produce any item.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
-            processor.checkAndClearProcessResult();
+        // push two items to the other stream. this should not produce any item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push all items to the other stream. this should not produce any item
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
-            }
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            processor.checkAndClearProcessResult();
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
-            // push all four items to the primary stream. this should produce four items.
+        // push all items to the other stream. this should not produce any item
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+        // push all four items to the primary stream. this should produce four items.
 
-            // push two items with null to the other stream as deletes. this should not produce any item.
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
 
-            for (int i = 0; i < 2; i++) {
-                driver.process(topic2, expectedKeys[i], null);
-            }
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-            processor.checkAndClearProcessResult();
+        // push two items with null to the other stream as deletes. this should not produce any item.
 
-            // push all four items to the primary stream. this should produce four items.
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
 
-            for (int i = 0; i < expectedKeys.length; i++) {
-                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
-            }
+        processor.checkAndClearProcessResult();
 
-            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+        // push all four items to the primary stream. this should produce four items.
 
-        } finally {
-            Utils.delete(baseDir);
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+
+        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
     }
 
     @Test(expected = KafkaException.class)
@@ -158,10 +153,10 @@ public class KStreamKTableLeftJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, stringSerde, topic1).map(keyValueMapper);
+        stream = builder.stream(intSerde, stringSerde, topic1).map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
         table = builder.table(intSerde, stringSerde, topic2);
 
-        stream.leftJoin(table, joiner).process(processor);
+        stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 68fa656..00e5d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -36,6 +37,16 @@ public class KStreamMapTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testMap() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -56,7 +67,7 @@ public class KStreamMapTest {
         processor = new MockProcessorSupplier<>();
         stream.map(mapper).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index e671aab..e48b677 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,6 +36,16 @@ public class KStreamMapValuesTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
+    private KStreamTestDriver driver;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
     @Test
     public void testFlatMapValues() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -54,7 +65,7 @@ public class KStreamMapValuesTest {
         stream = builder.stream(intSerde, stringSerde, topicName);
         stream.mapValues(mapper).process(processor);
 
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i]));
         }