You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 16:21:25 UTC

[kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [cleanup] (#4939)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f641fe  KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [cleanup] (#4939)
6f641fe is described below

commit 6f641fef6a88036ab4dacb59ab21bc8b21ef9bcf
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Mon May 7 17:21:20 2018 +0100

    KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [cleanup] (#4939)
    
    * Add method to create test properties to StreamsTestUtils
    * Make TopologyTestDriver protected constructor package-private
    * Add comment suggesting the use of TopologyTestDriver to KStreamTestDriver
    * Cleanup:
        - GlobalKTableJoinsTest
        - KGroupedStreamImplTest
        - KGroupedTableImplTest
        - KStreamBranchTest
        - KStreamFilterTest
        - KStreamFlatMapTest
        - KStreamFlatMapValuesTest
        - KStreamForeachTest
        - KStreamGlobalKTableJoinTest
        - KStreamGlobalKTableLeftJoinTest
        - KStreamImplTest
        - KStreamKStreamJoinTest
        - KStreamKStreamLeftJoinTest
        - KStreamGlobalKTableLeftJoinTest
        - KStreamKTableJoinTest
        - KStreamKTableLeftJoinTest
        - KStreamMapTest
        - KStreamMapValuesTest
        - KStreamPeekTest
        - StreamsBuilderTest
        - KStreamSelectKeyTest
        - KStreamTransformTest
        - KStreamTransformValuesTest
        - KStreamWindowAggregateTest
        - KTableForeachTest
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/streams/StreamsBuilderTest.java   |  87 +--
 .../kstream/internals/GlobalKTableJoinsTest.java   |  39 +-
 .../kstream/internals/KGroupedStreamImplTest.java  | 178 +++--
 .../kstream/internals/KGroupedTableImplTest.java   |  93 ++-
 .../kstream/internals/KStreamBranchTest.java       |  33 +-
 .../kstream/internals/KStreamFilterTest.java       |  42 +-
 .../kstream/internals/KStreamFlatMapTest.java      |  33 +-
 .../internals/KStreamFlatMapValuesTest.java        |  44 +-
 .../kstream/internals/KStreamForeachTest.java      |  39 +-
 .../internals/KStreamGlobalKTableJoinTest.java     |  25 +-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  24 +-
 .../streams/kstream/internals/KStreamImplTest.java | 129 ++--
 .../kstream/internals/KStreamKStreamJoinTest.java  | 792 ++++++++++-----------
 .../internals/KStreamKStreamLeftJoinTest.java      | 365 +++++-----
 .../kstream/internals/KStreamKTableJoinTest.java   |  24 +-
 .../internals/KStreamKTableLeftJoinTest.java       |  22 +-
 .../streams/kstream/internals/KStreamMapTest.java  |  43 +-
 .../kstream/internals/KStreamMapValuesTest.java    |  54 +-
 .../streams/kstream/internals/KStreamPeekTest.java |  52 +-
 .../kstream/internals/KStreamSelectKeyTest.java    |  39 +-
 .../kstream/internals/KStreamTransformTest.java    |  50 +-
 .../internals/KStreamTransformValuesTest.java      |  52 +-
 .../internals/KStreamWindowAggregateTest.java      | 254 +++----
 .../kstream/internals/KTableForeachTest.java       |  43 +-
 .../org/apache/kafka/test/KStreamTestDriver.java   |   5 +
 .../org/apache/kafka/test/StreamsTestUtils.java    |  23 +
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +-
 27 files changed, 1084 insertions(+), 1502 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 15e55d8..7c2bfa6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -35,9 +35,7 @@ import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -58,26 +56,7 @@ import static org.junit.Assert.assertFalse;
 public class StreamsBuilderTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-builder-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test(expected = TopologyException.class)
     public void testFrom() {
@@ -183,10 +162,10 @@ public class StreamsBuilderTest {
 
         source.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+            driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        }
 
         // no exception was thrown
         assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
@@ -203,10 +182,10 @@ public class StreamsBuilderTest {
         source.process(sourceProcessorSupplier);
         through.process(throughProcessorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+            driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        }
 
         assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
         assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
@@ -224,13 +203,13 @@ public class StreamsBuilderTest {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+            driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+            driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
@@ -250,17 +229,17 @@ public class StreamsBuilderTest {
                 .withValueSerde(Serdes.String()))
                 .toStream().foreach(action);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
         final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
-        driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
-
-        final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
-        assertThat(store.get(1L), equalTo("value1"));
-        assertThat(store.get(2L), equalTo("value2"));
-        assertThat(results.get(1L), equalTo("value1"));
-        assertThat(results.get(2L), equalTo("value2"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+            driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+
+            final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+            assertThat(store.get(1L), equalTo("value1"));
+            assertThat(store.get(2L), equalTo("value2"));
+            assertThat(results.get(1L), equalTo("value1"));
+            assertThat(results.get(2L), equalTo("value2"));
+        }
     }
 
     @Test
@@ -270,15 +249,15 @@ public class StreamsBuilderTest {
                 .withKeySerde(Serdes.Long())
                 .withValueSerde(Serdes.String()));
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
         final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
-        driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
-        final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+            driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+            final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
 
-        assertThat(store.get(1L), equalTo("value1"));
-        assertThat(store.get(2L), equalTo("value2"));
+            assertThat(store.get(1L), equalTo("value1"));
+            assertThat(store.get(2L), equalTo("value2"));
+        }
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 8c50afe..da8b102 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -28,8 +27,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,7 +48,6 @@ public class GlobalKTableJoinsTest {
     private KStream<String, String> stream;
     private KeyValueMapper<String, String, String> keyValueMapper;
     private ForeachAction<String, String> action;
-    private TopologyTestDriver driver;
 
 
     @Before
@@ -72,14 +69,6 @@ public class GlobalKTableJoinsTest {
         };
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void shouldLeftJoinWithStream() {
         stream
@@ -110,21 +99,17 @@ public class GlobalKTableJoinsTest {
 
     private void verifyJoin(final Map<String, String> expected) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "global-ktable-joins-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        // write some data to the global table
-        driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
-        driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
-        //write some data to the stream
-        driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
-        driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
-        driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            // write some data to the global table
+            driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
+            driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
+            //write some data to the stream
+            driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
+            driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
+            driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+        }
 
         assertEquals(expected, results);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index b9ca30f..e7a9226 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -50,8 +49,7 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -78,28 +76,13 @@ public class KGroupedStreamImplTest {
     private KGroupedStream<String, String> groupedStream;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
 
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-stream-impl-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
     }
 
     @SuppressWarnings("deprecation")
@@ -224,13 +207,14 @@ public class KGroupedStreamImplTest {
     }
 
     private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        }
         assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -298,13 +282,14 @@ public class KGroupedStreamImplTest {
     }
 
     private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        }
         assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -341,13 +326,14 @@ public class KGroupedStreamImplTest {
     }
 
     private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+        }
         assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -554,26 +540,30 @@ public class KGroupedStreamImplTest {
     public void shouldCountAndMaterializeResults() {
         groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
+            final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
 
-        assertThat(count.get("1"), equalTo(3L));
-        assertThat(count.get("2"), equalTo(1L));
-        assertThat(count.get("3"), equalTo(2L));
+            assertThat(count.get("1"), equalTo(3L));
+            assertThat(count.get("2"), equalTo(1L));
+            assertThat(count.get("3"), equalTo(2L));
+        }
     }
 
     @Test
     public void shouldLogAndMeasureSkipsInAggregate() {
         groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        processData();
-        LogCaptureAppender.unregister(appender);
-
-        final Map<MetricName, ? extends Metric> metrics = driver.metrics();
-        assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
-        assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            LogCaptureAppender.unregister(appender);
+
+            final Map<MetricName, ? extends Metric> metrics = driver.metrics();
+            assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+            assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        }
     }
 
 
@@ -586,13 +576,15 @@ public class KGroupedStreamImplTest {
                 .withKeySerde(Serdes.String())
                 .withValueSerde(Serdes.String()));
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
+            final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
 
-        assertThat(reduced.get("1"), equalTo("A+C+D"));
-        assertThat(reduced.get("2"), equalTo("B"));
-        assertThat(reduced.get("3"), equalTo("E+F"));
+            assertThat(reduced.get("1"), equalTo("A+C+D"));
+            assertThat(reduced.get("2"), equalTo("B"));
+            assertThat(reduced.get("3"), equalTo("E+F"));
+        }
     }
 
     @Test
@@ -605,13 +597,15 @@ public class KGroupedStreamImplTest {
         );
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        processData();
-        LogCaptureAppender.unregister(appender);
-
-        final Map<MetricName, ? extends Metric> metrics = driver.metrics();
-        assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
-        assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            LogCaptureAppender.unregister(appender);
+
+            final Map<MetricName, ? extends Metric> metrics = driver.metrics();
+            assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+            assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        }
     }
 
 
@@ -625,13 +619,15 @@ public class KGroupedStreamImplTest {
                 .withKeySerde(Serdes.String())
                 .withValueSerde(Serdes.String()));
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
+            final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
 
-        assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
-        assertThat(aggregate.get("2"), equalTo("0+B"));
-        assertThat(aggregate.get("3"), equalTo("0+E+F"));
+            assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
+            assertThat(aggregate.get("2"), equalTo("0+B"));
+            assertThat(aggregate.get("3"), equalTo("0+E+F"));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -649,15 +645,16 @@ public class KGroupedStreamImplTest {
                 }
             });
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        assertThat(results.get("1"), equalTo("0+A+C+D"));
-        assertThat(results.get("2"), equalTo("0+B"));
-        assertThat(results.get("3"), equalTo("0+E+F"));
+            assertThat(results.get("1"), equalTo("0+A+C+D"));
+            assertThat(results.get("2"), equalTo("0+B"));
+            assertThat(results.get("3"), equalTo("0+E+F"));
+        }
     }
 
-    private void processData() {
-        driver = new TopologyTestDriver(builder.build(), props);
+    private void processData(final TopologyTestDriver driver) {
         driver.pipeInput(recordFactory.create(TOPIC, "1", "A"));
         driver.pipeInput(recordFactory.create(TOPIC, "2", "B"));
         driver.pipeInput(recordFactory.create(TOPIC, "1", "C"));
@@ -668,22 +665,23 @@ public class KGroupedStreamImplTest {
     }
 
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
-        driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
+            driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+        }
         assertThat(results, equalTo(Arrays.asList(
-            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
-            KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
-            KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
-            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
-            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
-            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
-            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
         )));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 742f349..05d339f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
@@ -39,8 +38,7 @@ import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -60,29 +58,13 @@ public class KGroupedTableImplTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
     private final String topic = "input";
 
     @Before
     public void before() {
         groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
                 .groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
-
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-table-impl-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
     }
 
     @Test
@@ -140,30 +122,31 @@ public class KGroupedTableImplTest {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier<KeyValueStore>) null);
     }
 
-    private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
-        final Map<String, Integer> results = new HashMap<>();
-        final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
-        reduced.foreach(new ForeachAction<String, Integer>() {
+    private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
+        final Map<String, Integer> reducedResults = new HashMap<>();
+        inputKTable.foreach(new ForeachAction<String, Integer>() {
             @Override
             public void apply(final String key, final Integer value) {
-                results.put(key, value);
+                reducedResults.put(key, value);
             }
         });
-
-        driver = new TopologyTestDriver(builder.build(), props);
+        return reducedResults;
+    }
+    private void assertReduced(final Map<String, Integer> reducedResults, final String topic, final TopologyTestDriver driver) {
+        final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
         driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
 
-        assertEquals(Integer.valueOf(1), results.get("A"));
-        assertEquals(Integer.valueOf(2), results.get("B"));
+        assertEquals(Integer.valueOf(1), reducedResults.get("A"));
+        assertEquals(Integer.valueOf(2), reducedResults.get("B"));
 
         driver.pipeInput(recordFactory.create(topic, "A", 2.6, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 1.3, 10));
         driver.pipeInput(recordFactory.create(topic, "A", 5.7, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 6.2, 10));
 
-        assertEquals(Integer.valueOf(5), results.get("A"));
-        assertEquals(Integer.valueOf(6), results.get("B"));
+        assertEquals(Integer.valueOf(5), reducedResults.get("A"));
+        assertEquals(Integer.valueOf(6), reducedResults.get("B"));
     }
 
     @Test
@@ -184,8 +167,11 @@ public class KGroupedTableImplTest {
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
 
-        doShouldReduce(reduced, topic);
-        assertEquals(reduced.queryableStoreName(), "reduced");
+        final Map<String, Integer> results = getReducedResults(reduced);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertReduced(results, topic, driver);
+            assertEquals(reduced.queryableStoreName(), "reduced");
+        }
     }
 
     @Test
@@ -206,8 +192,11 @@ public class KGroupedTableImplTest {
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
 
-        doShouldReduce(reduced, topic);
-        assertNull(reduced.queryableStoreName());
+        final Map<String, Integer> results = getReducedResults(reduced);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertReduced(results, topic, driver);
+            assertNull(reduced.queryableStoreName());
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -229,10 +218,13 @@ public class KGroupedTableImplTest {
                                 .withKeySerde(Serdes.String())
                                 .withValueSerde(Serdes.Integer()));
 
-        doShouldReduce(reduced, topic);
-        final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
-        assertThat(reduce.get("A"), equalTo(5));
-        assertThat(reduce.get("B"), equalTo(6));
+        final Map<String, Integer> results = getReducedResults(reduced);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertReduced(results, topic, driver);
+            final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
+            assertThat(reduce.get("A"), equalTo(5));
+            assertThat(reduce.get("B"), equalTo(6));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -246,10 +238,12 @@ public class KGroupedTableImplTest {
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()));
 
-        processData(topic);
-        final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
-        assertThat(counts.get("1"), equalTo(3L));
-        assertThat(counts.get("2"), equalTo(2L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(topic, driver);
+            final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
+            assertThat(counts.get("1"), equalTo(3L));
+            assertThat(counts.get("2"), equalTo(2L));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -266,10 +260,12 @@ public class KGroupedTableImplTest {
                                    .withValueSerde(Serdes.String())
                                    .withKeySerde(Serdes.String()));
 
-        processData(topic);
-        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
-        assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
-        assertThat(aggregate.get("2"), equalTo("0+2+2"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(topic, driver);
+            final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
+            assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
+            assertThat(aggregate.get("2"), equalTo("0+2+2"));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -327,8 +323,7 @@ public class KGroupedTableImplTest {
                                (Materialized) null);
     }
 
-    private void processData(final String topic) {
-        driver = new TopologyTestDriver(builder.build(), props);
+    private void processData(final String topic, final TopologyTestDriver driver) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         driver.pipeInput(recordFactory.create(topic, "A", "1"));
         driver.pipeInput(recordFactory.create(topic, "B", "1"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index bd3d60b..2aa8aac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -21,16 +21,13 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.List;
@@ -42,26 +39,7 @@ public class KStreamBranchTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-branch-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @SuppressWarnings("unchecked")
     @Test
@@ -102,9 +80,10 @@ public class KStreamBranchTest {
             branches[i].process(supplier);
         }
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         final List<MockProcessor<Integer, String>> processors = supplier.capturedProcessors(3);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index d338fe3..51a994b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -21,15 +21,12 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -40,28 +37,9 @@ public class KStreamFilterTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-filter-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-  
-    private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
+    private final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
         public boolean test(Integer key, String value) {
             return (key % 3) == 0;
@@ -80,9 +58,10 @@ public class KStreamFilterTest {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filter(isMultipleOfThree).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(2, supplier.theCapturedProcessor().processed.size());
@@ -100,9 +79,10 @@ public class KStreamFilterTest {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filterNot(isMultipleOfThree).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(5, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 9ce24b5..3173dcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -22,15 +22,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -42,26 +39,7 @@ public class KStreamFlatMapTest {
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMap() {
@@ -88,9 +66,10 @@ public class KStreamFlatMapTest {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.flatMap(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(6, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 221b02b..471b127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -20,16 +20,13 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -41,26 +38,7 @@ public class KStreamFlatMapValuesTest {
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
@@ -83,10 +61,11 @@ public class KStreamFlatMapValuesTest {
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream.flatMapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (final int expectedKey : expectedKeys) {
-            // passing the timestamp to recordFactory.create to disambiguate the call
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (final int expectedKey : expectedKeys) {
+                // passing the timestamp to recordFactory.create to disambiguate the call
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+            }
         }
 
         String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
@@ -117,10 +96,11 @@ public class KStreamFlatMapValuesTest {
 
         stream.flatMapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (final int expectedKey : expectedKeys) {
-            // passing the timestamp to recordFactory.create to disambiguate the call
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (final int expectedKey : expectedKeys) {
+                // passing the timestamp to recordFactory.create to disambiguate the call
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+            }
         }
 
         String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index b975c96..83a20a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -17,20 +17,16 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -45,29 +41,7 @@ public class KStreamForeachTest {
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-foreach-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testForeach() {
@@ -97,13 +71,14 @@ public class KStreamForeachTest {
 
         // When
         StreamsBuilder builder = new StreamsBuilder();
-        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.foreach(action);
 
         // Then
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (KeyValue<Integer, String> record: inputRecords) {
-            driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (KeyValue<Integer, String> record : inputRecords) {
+                driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+            }
         }
 
         assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 6e5b816..c37e8a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -17,22 +17,20 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,8 +45,6 @@ public class KStreamGlobalKTableJoinTest {
 
     private final String streamTopic = "streamTopic";
     private final String globalTableTopic = "globalTableTopic";
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
     private TopologyTestDriver driver;
     private MockProcessor<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
@@ -63,8 +59,8 @@ public class KStreamGlobalKTableJoinTest {
         final KeyValueMapper<Integer, String, String> keyMapper;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
-        final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+        final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
         table = builder.globalTable(globalTableTopic, tableConsumed);
         keyMapper = new KeyValueMapper<Integer, String, String>() {
@@ -78,13 +74,7 @@ public class KStreamGlobalKTableJoinTest {
         };
         stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
@@ -92,10 +82,7 @@ public class KStreamGlobalKTableJoinTest {
 
     @After
     public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
+        driver.close();
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index b3551ba..eb0775a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +30,8 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,8 +45,6 @@ public class KStreamGlobalKTableLeftJoinTest {
 
     final private String streamTopic = "streamTopic";
     final private String globalTableTopic = "globalTableTopic";
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
 
     private MockProcessor<Integer, String> processor;
     private TopologyTestDriver driver;
@@ -64,8 +61,8 @@ public class KStreamGlobalKTableLeftJoinTest {
         final KeyValueMapper<Integer, String, String> keyMapper;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
-        final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+        final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
         table = builder.globalTable(globalTableTopic, tableConsumed);
         keyMapper = new KeyValueMapper<Integer, String, String>() {
@@ -79,18 +76,17 @@ public class KStreamGlobalKTableLeftJoinTest {
         };
         stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
     }
 
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
         final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 797575d..49e8aaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -24,7 +23,6 @@ import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -49,8 +47,7 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -69,48 +66,28 @@ import static org.junit.Assert.fail;
 
 public class KStreamImplTest {
 
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
-
     private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
 
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
         builder = new StreamsBuilder();
         testStream = builder.stream("source");
-
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-impl-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
     }
 
     @Test
     public void testNumProcesses() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
+        KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
 
-        KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
+        KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
 
         KStream<String, String> stream1 =
             source1.filter(new Predicate<String, String>() {
@@ -170,7 +147,7 @@ public class KStreamImplTest {
         );
 
         final int anyWindowSize = 1;
-        final Joined<String, Integer, Integer> joined = Joined.with(stringSerde, intSerde, intSerde);
+        final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
         KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
@@ -205,9 +182,8 @@ public class KStreamImplTest {
     @Test
     public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-        KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
-        KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
+        KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
+        KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
 
         stream1.to("topic-5");
         stream2.through("topic-6");
@@ -224,11 +200,12 @@ public class KStreamImplTest {
     public void shouldSendDataThroughTopicUsingProduced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
-        final KStream<String, String> stream = builder.stream(input, consumed);
-        stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
+        final KStream<String, String> stream = builder.stream(input, stringConsumed);
+        stream.through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(input, "a", "b"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(input, "a", "b"));
+        }
         assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b")));
     }
 
@@ -236,12 +213,13 @@ public class KStreamImplTest {
     public void shouldSendDataToTopicUsingProduced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
-        final KStream<String, String> stream = builder.stream(input, consumed);
-        stream.to("to-topic", Produced.with(stringSerde, stringSerde));
-        builder.stream("to-topic", consumed).process(processorSupplier);
+        final KStream<String, String> stream = builder.stream(input, stringConsumed);
+        stream.to("to-topic", Produced.with(Serdes.String(), Serdes.String()));
+        builder.stream("to-topic", stringConsumed).process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(input, "e", "f"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(input, "e", "f"));
+        }
         assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f")));
     }
 
@@ -249,7 +227,7 @@ public class KStreamImplTest {
     // TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
     public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
         final KStreamBuilder builder = new KStreamBuilder();
-        KStream<String, String> kStream = builder.stream(stringSerde, stringSerde, "topic-1");
+        KStream<String, String> kStream = builder.stream(Serdes.String(), Serdes.String(), "topic-1");
         ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
         long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
         final KStream<String, String> stream = kStream
@@ -282,9 +260,8 @@ public class KStreamImplTest {
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-        final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), consumed);
-        inputStream.to(stringSerde, null, "output");
+        final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), stringConsumed);
+        inputStream.to(Serdes.String(), null, "output");
     }
 
     @Test(expected = NullPointerException.class)
@@ -477,7 +454,7 @@ public class KStreamImplTest {
 
     @Test
     public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
-        final KTable<String, String> table = builder.table("blah", consumed);
+        final KTable<String, String> table = builder.table("blah", stringConsumed);
         try {
             testStream.leftJoin(table,
                                 MockValueJoiner.TOSTRING_JOINER,
@@ -490,7 +467,7 @@ public class KStreamImplTest {
 
     @Test
     public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
-        final KTable<String, String> table = builder.table("blah", consumed);
+        final KTable<String, String> table = builder.table("blah", stringConsumed);
         try {
             testStream.join(table,
                             MockValueJoiner.TOSTRING_JOINER,
@@ -522,12 +499,12 @@ public class KStreamImplTest {
 
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+            driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+            driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
@@ -547,16 +524,16 @@ public class KStreamImplTest {
 
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
-        driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
-        driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
-        driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
-        driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+            driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+            driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
+            driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
+            driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
+            driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
+            driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
+            driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
                      processorSupplier.theCapturedProcessor().processed);
@@ -568,13 +545,13 @@ public class KStreamImplTest {
 
         pattern2Source.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
-        driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
-        driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
-        driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
-        driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+            driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+            driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
+            driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
+            driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
                 processorSupplier.theCapturedProcessor().processed);
@@ -591,13 +568,13 @@ public class KStreamImplTest {
 
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
-        driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
-        driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
-        driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
-        driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+            driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+            driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
+            driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
+            driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
                 processorSupplier.theCapturedProcessor().processed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 5d849ee..de3446c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -17,26 +17,22 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -55,38 +51,16 @@ public class KStreamKStreamJoinTest {
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setUp() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
-        final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
         final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
 
         left.join(
@@ -98,17 +72,18 @@ public class KStreamKStreamJoinTest {
                 }
             },
             JoinWindows.of(100),
-            Joined.with(stringSerde, intSerde, intSerde)
+            Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
         );
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create("left", "A", null));
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create("left", "A", null));
+            LogCaptureAppender.unregister(appender);
 
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
 
-        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        }
     }
 
     @Test
@@ -127,7 +102,7 @@ public class KStreamKStreamJoinTest {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -135,81 +110,82 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        // push two items to the primary stream. the other window is empty
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = {}
+            // push two items to the primary stream. the other window is empty
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = {}
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all four items to the primary stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push all four items to the primary stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all items to the other stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all items to the other stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // push all four items to the primary stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
-        // push two items to the other stream. this should produce six item.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            // push two items to the other stream. this should produce six item.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+        }
     }
 
     @Test
@@ -229,88 +205,89 @@ public class KStreamKStreamJoinTest {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        // push two items to the primary stream. the other window is empty.this should produce two items
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = {}
+            // push two items to the primary stream. the other window is empty.this should produce two items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = {}
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all four items to the primary stream. this should produce four items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push all four items to the primary stream. this should produce four items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
-        // push all items to the other stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all items to the other stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // push all four items to the primary stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
-        // push two items to the other stream. this should produce six item.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            // push two items to the other stream. this should produce six item.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+        }
     }
 
     @Test
@@ -332,7 +309,7 @@ public class KStreamKStreamJoinTest {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -340,197 +317,198 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
 
-        // push two items to the primary stream. the other window is empty. this should produce no items.
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = {}
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
-        }
+            // push two items to the primary stream. the other window is empty. this should produce no items.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = {}
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+            }
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // clear logically
-        time = 1000L;
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
+            // clear logically
+            time = 1000L;
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
 
-        // gradually expires items in w1
-        // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+            // gradually expires items in w1
+            // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
 
-        time += 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("3:X3+YY3");
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // go back to the time before expiration
+            // go back to the time before expiration
 
-        time = 1000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time = 1000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0");
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        time += 1;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        time += 1;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // clear (logically)
-        time = 2000L;
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
-        }
+            // clear (logically)
+            time = 2000L;
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // gradually expires items in w2
-        // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+            // gradually expires items in w2
+            // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
-        time = 2000L + 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time = 2000L + 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("3:XX3+Y3");
+            processor.checkAndClearProcessResult("3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // go back to the time before expiration
+            // go back to the time before expiration
 
-        time = 2000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time = 2000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0");
+            processor.checkAndClearProcessResult("0:XX0+Y0");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        }
     }
 
     @Test
@@ -552,9 +530,9 @@ public class KStreamKStreamJoinTest {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(0).after(100),
-            Joined.with(intSerde,
-                stringSerde,
-                stringSerde));
+            Joined.with(Serdes.Integer(),
+                Serdes.String(),
+                Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -562,85 +540,85 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
 
+            time = 1000L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult();
 
-        processor.checkAndClearProcessResult();
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
-        processor.checkAndClearProcessResult("0:X0+YY0");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time = 1000 + 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000 + 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
-        processor.checkAndClearProcessResult("3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            processor.checkAndClearProcessResult();
         }
-
-        processor.checkAndClearProcessResult();
     }
 
     @Test
@@ -663,7 +641,7 @@ public class KStreamKStreamJoinTest {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(0).before(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -671,84 +649,84 @@ public class KStreamKStreamJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
 
+            time = 1000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult();
 
-        processor.checkAndClearProcessResult();
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
-        processor.checkAndClearProcessResult("0:X0+YY0");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time = 1000L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
-        processor.checkAndClearProcessResult("3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            processor.checkAndClearProcessResult();
         }
-
-        processor.checkAndClearProcessResult();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index c67e13d..11c5c5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -17,24 +17,20 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -50,30 +46,9 @@ public class KStreamKStreamLeftJoinTest {
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private Properties props = new Properties();
-
-    @Before
-    public void setUp() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-left-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testLeftJoin() {
@@ -91,7 +66,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2,
                                   MockValueJoiner.TOSTRING_JOINER,
                                   JoinWindows.of(100),
-                                  Joined.with(intSerde, stringSerde, stringSerde));
+                                  Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -99,65 +74,66 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
-        // push two items to the primary stream. the other window is empty
-        // w1 {}
-        // w2 {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = {}
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
-        }
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = { 0:Y0, 1:Y1 }
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
-        }
-
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-
-        // push three items to the primary stream. this should produce four items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // --> w2 = { 0:Y0, 1:Y1 }
-
-        for (int i = 0; i < 3; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            // push two items to the primary stream. the other window is empty
+            // w1 {}
+            // w2 {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+            }
+
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+            // push three items to the primary stream. this should produce four items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // --> w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 3; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
+
+            // push all items to the other stream. this should produce 5 items
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+            }
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
+
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
         }
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
-
-        // push all items to the other stream. this should produce 5 items
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
-        }
-        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
-
-        // push all four items to the primary stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
     }
 
     @Test
@@ -176,7 +152,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2,
                                   MockValueJoiner.TOSTRING_JOINER,
                                   JoinWindows.of(100),
-                                  Joined.with(intSerde, stringSerde, stringSerde));
+                                  Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -184,111 +160,112 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
-
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
-        // push two items to the primary stream. the other window is empty. this should produce two items
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = {}
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
-        }
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
-        // push two items to the other stream. this should produce no items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = { 0:Y0, 1:Y1 }
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
-        }
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-
-        // clear logically
-        time = 1000L;
-
-        // push all items to the other stream. this should produce no items.
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = {}
-        // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
-
-        // gradually expire items in window 2.
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = {}
-        // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-
-        time = 1000L + 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
-        // go back to the time before expiration
-
-        time = 1000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
+
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            // push two items to the primary stream. the other window is empty. this should produce two items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+            }
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce no items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+            }
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+            // clear logically
+            time = 1000L;
+
+            // push all items to the other stream. this should produce no items.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = {}
+            // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
+
+            // gradually expire items in window 2.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = {}
+            // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+
+            time = 1000L + 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            // go back to the time before expiration
+
+            time = 1000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
         }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index ec31b5a..0ce27ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -17,22 +17,21 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,8 +51,6 @@ public class KStreamKTableJoinTest {
     private final String streamTopic = "streamTopic";
     private final String tableTopic = "tableTopic";
 
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
 
     private final int[] expectedKeys = {0, 1, 2, 3};
@@ -70,23 +67,22 @@ public class KStreamKTableJoinTest {
         final KTable<Integer, String> table;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
     }
 
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
     private void pushToStream(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
             driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 735f71c..eedda07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -31,7 +29,8 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -48,8 +47,6 @@ public class KStreamKTableLeftJoinTest {
     final private String streamTopic = "streamTopic";
     final private String tableTopic = "tableTopic";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
     private TopologyTestDriver driver;
     private MockProcessor<Integer, String> processor;
@@ -66,23 +63,22 @@ public class KStreamKTableLeftJoinTest {
         final KTable<Integer, String> table;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
     }
 
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
     private void pushToStream(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
             driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index b0a383b..b55d8e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -41,30 +37,8 @@ import static org.junit.Assert.assertEquals;
 public class KStreamMapTest {
 
     private String topicName = "topic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMap() {
@@ -80,15 +54,14 @@ public class KStreamMapTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
-        MockProcessorSupplier<String, Integer> supplier;
-
-        supplier = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.map(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(4, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index ed11038..95593aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -41,33 +37,9 @@ import static org.junit.Assert.assertArrayEquals;
 public class KStreamMapValuesTest {
 
     private String topicName = "topic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
-
-
+    private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
@@ -83,13 +55,13 @@ public class KStreamMapValuesTest {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        KStream<Integer, String> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.mapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+            }
         }
         String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
 
@@ -110,13 +82,13 @@ public class KStreamMapValuesTest {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        KStream<Integer, String> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.mapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+            }
         }
         String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 2c6ff81..137aa6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -17,20 +17,16 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -43,53 +39,33 @@ import static org.junit.Assert.fail;
 public class KStreamPeekTest {
 
     private final String topicName = "topic";
-    private final Serde<Integer> intSerd = Serdes.Integer();
-    private final Serde<String> stringSerd = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-peek-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void shouldObserveStreamElements() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
+        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        final List<KeyValue<Integer, String>> expected = new ArrayList<>();
-        for (int key = 0; key < 32; key++) {
-            final String value = "V" + key;
-            driver.pipeInput(recordFactory.create(topicName, key, value));
-            expected.add(new KeyValue<>(key, value));
-        }
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final List<KeyValue<Integer, String>> expected = new ArrayList<>();
+            for (int key = 0; key < 32; key++) {
+                final String value = "V" + key;
+                driver.pipeInput(recordFactory.create(topicName, key, value));
+                expected.add(new KeyValue<>(key, value));
+            }
 
-        assertEquals(expected, peekObserved);
-        assertEquals(expected, streamObserved);
+            assertEquals(expected, peekObserved);
+            assertEquals(expected, streamObserved);
+        }
     }
 
     @Test
     public void shouldNotAllowNullAction() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
+        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         try {
             stream.peek(null);
             fail("expected null action to throw NPE");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 1abc0b9..b030233 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -44,29 +40,8 @@ public class KStreamSelectKeyTest {
 
     private String topicName = "topic_key_select";
 
-    final private Serde<Integer> integerSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-select-key-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
 
     @Test
     public void testSelectKey() {
@@ -88,16 +63,16 @@ public class KStreamSelectKeyTest {
         final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
         final int[] expectedValues = new int[]{1, 2, 3};
 
-        KStream<String, Integer>  stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
+        KStream<String, Integer>  stream = builder.stream(topicName, Consumed.with(Serdes.String(), Serdes.Integer()));
 
         MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
         stream.selectKey(selector).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (int expectedValue : expectedValues) {
-            driver.pipeInput(recordFactory.create(expectedValue));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedValue : expectedValues) {
+                driver.pipeInput(recordFactory.create(expectedValue));
+            }
         }
 
         assertEquals(3, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 1567fe1..8a05aac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -17,12 +17,10 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
@@ -33,9 +31,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -47,33 +43,12 @@ public class KStreamTransformTest {
 
     private String topicName = "topic";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
 
     @Rule
     public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
 
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void testTransform() {
         StreamsBuilder builder = new StreamsBuilder();
@@ -102,7 +77,7 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
         kstreamDriver.setUp(builder);
@@ -161,18 +136,19 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
-        }
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+            }
 
-        // This tick will yield yields the "-1:2" result
-        driver.advanceWallClockTime(2);
-        // This tick further advances the clock to 3, which leads to the "-1:3" result
-        driver.advanceWallClockTime(1);
+            // This tick will yield yields the "-1:2" result
+            driver.advanceWallClockTime(2);
+            // This tick further advances the clock to 3, which leads to the "-1:3" result
+            driver.advanceWallClockTime(1);
+        }
 
         assertEquals(6, processor.theCapturedProcessor().processed.size());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 6bfc813..419e6f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -17,11 +17,9 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
@@ -34,9 +32,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -47,31 +43,9 @@ import static org.junit.Assert.fail;
 public class KStreamTransformValuesTest {
 
     private String topicName = "topic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
-
+    private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
 
     @Test
     public void testTransform() {
@@ -109,13 +83,13 @@ public class KStreamTransformValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, Integer> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transformValues(valueTransformerSupplier).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+            }
         }
         String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
 
@@ -152,13 +126,13 @@ public class KStreamTransformValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, Integer> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transformValues(valueTransformerSupplier).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+            }
         }
         String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 9050edb..7a2a8e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -39,9 +37,7 @@ import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.List;
@@ -54,28 +50,8 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowAggregateTest {
 
-    final private Serde<String> strSerde = Serdes.String();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-window-aggregate-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testAggBasic() {
@@ -83,31 +59,31 @@ public class KStreamWindowAggregateTest {
         final String topic1 = "topic1";
 
         final KTable<Windowed<String>, String> table2 = builder
-            .stream(topic1, Consumed.with(strSerde, strSerde))
-            .groupByKey(Serialized.with(strSerde, strSerde))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic1-Canonized");
 
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+        }
 
 
         assertEquals(
@@ -141,16 +117,16 @@ public class KStreamWindowAggregateTest {
         final String topic2 = "topic2";
 
         final KTable<Windowed<String>, String> table1 = builder
-            .stream(topic1, Consumed.with(strSerde, strSerde))
-            .groupByKey(Serialized.with(strSerde, strSerde))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic1-Canonized");
 
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         final KTable<Windowed<String>, String> table2 = builder
-            .stream(topic2, Consumed.with(strSerde, strSerde)).groupByKey(Serialized.with(strSerde, strSerde))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic2-Canonized");
+            .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic2-Canonized");
 
         table2.toStream().process(supplier);
 
@@ -162,84 +138,84 @@ public class KStreamWindowAggregateTest {
             }
         }).toStream().process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
-
-        final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
-
-        processors.get(0).checkAndClearProcessResult(
-            "[A@0/10]:0+1",
-            "[B@0/10]:0+2",
-            "[C@0/10]:0+3",
-            "[D@0/10]:0+4",
-            "[A@0/10]:0+1+1"
-        );
-        processors.get(1).checkAndClearProcessResult();
-        processors.get(2).checkAndClearProcessResult();
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
-
-        processors.get(0).checkAndClearProcessResult(
-            "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
-            "[B@0/10]:0+2+2", "[B@5/15]:0+2",
-            "[D@0/10]:0+4+4", "[D@5/15]:0+4",
-            "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
-            "[C@0/10]:0+3+3", "[C@5/15]:0+3"
-        );
-        processors.get(1).checkAndClearProcessResult();
-        processors.get(2).checkAndClearProcessResult();
-
-        driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
-        driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
-        driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
-        driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
-        driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
-
-        processors.get(0).checkAndClearProcessResult();
-        processors.get(1).checkAndClearProcessResult(
-            "[A@0/10]:0+a",
-            "[B@0/10]:0+b",
-            "[C@0/10]:0+c",
-            "[D@0/10]:0+d",
-            "[A@0/10]:0+a+a"
-        );
-        processors.get(2).checkAndClearProcessResult(
-            "[A@0/10]:0+1+1+1%0+a",
-            "[B@0/10]:0+2+2+2%0+b",
-            "[C@0/10]:0+3+3%0+c",
-            "[D@0/10]:0+4+4%0+d",
-            "[A@0/10]:0+1+1+1%0+a+a");
-
-        driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
-        driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
-        driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
-        driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
-        driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
-
-        processors.get(0).checkAndClearProcessResult();
-        processors.get(1).checkAndClearProcessResult(
-            "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
-            "[B@0/10]:0+b+b", "[B@5/15]:0+b",
-            "[D@0/10]:0+d+d", "[D@5/15]:0+d",
-            "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
-            "[C@0/10]:0+c+c", "[C@5/15]:0+c"
-        );
-        processors.get(2).checkAndClearProcessResult(
-            "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
-            "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
-            "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
-            "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
-            "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
-        );
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+            final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+            processors.get(0).checkAndClearProcessResult(
+                    "[A@0/10]:0+1",
+                    "[B@0/10]:0+2",
+                    "[C@0/10]:0+3",
+                    "[D@0/10]:0+4",
+                    "[A@0/10]:0+1+1"
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+
+            processors.get(0).checkAndClearProcessResult(
+                    "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+                    "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+                    "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+                    "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+                    "[C@0/10]:0+3+3", "[C@5/15]:0+3"
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
+            driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
+            driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
+            driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
+            driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    "[A@0/10]:0+a",
+                    "[B@0/10]:0+b",
+                    "[C@0/10]:0+c",
+                    "[D@0/10]:0+d",
+                    "[A@0/10]:0+a+a"
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    "[A@0/10]:0+1+1+1%0+a",
+                    "[B@0/10]:0+2+2+2%0+b",
+                    "[C@0/10]:0+3+3%0+c",
+                    "[D@0/10]:0+4+4%0+d",
+                    "[A@0/10]:0+1+1+1%0+a+a");
+
+            driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
+            driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
+            driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
+            driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
+            driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+                    "[B@0/10]:0+b+b", "[B@5/15]:0+b",
+                    "[D@0/10]:0+d+d", "[D@5/15]:0+d",
+                    "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+                    "[C@0/10]:0+c+c", "[C@5/15]:0+c"
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+                    "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
+                    "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
+                    "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+                    "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
+            );
+        }
     }
 
     @Test
@@ -247,22 +223,22 @@ public class KStreamWindowAggregateTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
 
-        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(strSerde, strSerde));
-        stream1.groupByKey(Serialized.with(strSerde, strSerde))
+        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(10).advanceBy(5))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.<String, String>toStringInstance("+"),
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String())
             );
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.pipeInput(recordFactory.create(topic, null, "1"));
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic, null, "1"));
+            LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 30c0a7a..a6b6c64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -17,23 +17,19 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -48,29 +44,8 @@ import static org.junit.Assert.assertEquals;
 public class KTableForeachTest {
 
     final private String topicName = "topic";
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-foreach-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testForeach() {
@@ -101,17 +76,17 @@ public class KTableForeachTest {
         // When
         StreamsBuilder builder = new StreamsBuilder();
         KTable<Integer, String> table = builder.table(topicName,
-                                                      Consumed.with(intSerde, stringSerde),
+                                                      Consumed.with(Serdes.Integer(), Serdes.String()),
                                                       Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
-                                                              .withKeySerde(intSerde)
-                                                              .withValueSerde(stringSerde));
+                                                              .withKeySerde(Serdes.Integer())
+                                                              .withValueSerde(Serdes.String()));
         table.foreach(action);
 
         // Then
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (KeyValue<Integer, String> record: inputRecords) {
-            driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (KeyValue<Integer, String> record : inputRecords) {
+                driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+            }
         }
 
         assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index cf4460d..bcb9856 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -44,6 +44,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+/**
+ * KStreamTestDriver
+ *
+ * @deprecated please use {@link org.apache.kafka.streams.TopologyTestDriver} instead
+ */
 @Deprecated
 public class KStreamTestDriver extends ExternalResource {
 
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index a19b55c..9406519 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -58,6 +59,28 @@ public final class StreamsTestUtils {
 
     }
 
+    public static Properties topologyTestConfig(final String applicationId,
+                                                final String bootstrapServers,
+                                                final String keyDeserializer,
+                                                final String valueDeserializer) {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keyDeserializer);
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueDeserializer);
+        return props;
+    }
+
+    public static Properties topologyTestConfig(final Serde keyDeserializer,
+                                                final Serde valueDeserializer) {
+        return topologyTestConfig(
+                UUID.randomUUID().toString(),
+                "localhost:9091",
+                keyDeserializer.getClass().getName(),
+                valueDeserializer.getClass().getName());
+    }
+
     public static Properties minimalStreamsConfig() {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f6bbc4b..7b8bdd5 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -226,7 +226,7 @@ public class TopologyTestDriver implements Closeable {
      * @param builder builder for the topology to be tested
      * @param config the configuration for the topology
      */
-    protected TopologyTestDriver(final InternalTopologyBuilder builder,
+    TopologyTestDriver(final InternalTopologyBuilder builder,
                               final Properties config) {
         this(builder, config,  System.currentTimeMillis());
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.