You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/07 16:22:02 UTC

[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

    [ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466096#comment-16466096 ] 

ASF GitHub Bot commented on KAFKA-6474:
---------------------------------------

guozhangwang closed pull request #4939: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [cleanup]
URL: https://github.com/apache/kafka/pull/4939
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 15e55d873a4..7c2bfa6b16a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -35,9 +35,7 @@
 import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -58,26 +56,7 @@
 public class StreamsBuilderTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-builder-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test(expected = TopologyException.class)
     public void testFrom() {
@@ -183,10 +162,10 @@ public void shouldProcessingFromSinkTopic() {
 
         source.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+            driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        }
 
         // no exception was thrown
         assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
@@ -203,10 +182,10 @@ public void shouldProcessViaThroughTopic() {
         source.process(sourceProcessorSupplier);
         through.process(throughProcessorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+            driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+        }
 
         assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
         assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
@@ -224,13 +203,13 @@ public void testMerge() {
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+            driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+            driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
@@ -250,17 +229,17 @@ public void apply(final Long key, final String value) {
                 .withValueSerde(Serdes.String()))
                 .toStream().foreach(action);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
         final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
-        driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
-
-        final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
-        assertThat(store.get(1L), equalTo("value1"));
-        assertThat(store.get(2L), equalTo("value2"));
-        assertThat(results.get(1L), equalTo("value1"));
-        assertThat(results.get(2L), equalTo("value2"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+            driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+
+            final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+            assertThat(store.get(1L), equalTo("value1"));
+            assertThat(store.get(2L), equalTo("value2"));
+            assertThat(results.get(1L), equalTo("value1"));
+            assertThat(results.get(2L), equalTo("value2"));
+        }
     }
 
     @Test
@@ -270,15 +249,15 @@ public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
                 .withKeySerde(Serdes.Long())
                 .withValueSerde(Serdes.String()));
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
         final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
-        driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
-        driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
-        final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+            driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+            final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
 
-        assertThat(store.get(1L), equalTo("value1"));
-        assertThat(store.get(2L), equalTo("value2"));
+            assertThat(store.get(1L), equalTo("value1"));
+            assertThat(store.get(2L), equalTo("value2"));
+        }
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 8c50afeda6f..da8b102fde6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -20,7 +20,6 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -28,8 +27,7 @@
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,7 +48,6 @@
     private KStream<String, String> stream;
     private KeyValueMapper<String, String, String> keyValueMapper;
     private ForeachAction<String, String> action;
-    private TopologyTestDriver driver;
 
 
     @Before
@@ -72,14 +69,6 @@ public void apply(final String key, final String value) {
         };
     }
 
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void shouldLeftJoinWithStream() {
         stream
@@ -110,21 +99,17 @@ public void shouldInnerJoinWithStream() {
 
     private void verifyJoin(final Map<String, String> expected) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "global-ktable-joins-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        // write some data to the global table
-        driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
-        driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
-        //write some data to the stream
-        driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
-        driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
-        driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            // write some data to the global table
+            driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
+            driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
+            //write some data to the stream
+            driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
+            driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
+            driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+        }
 
         assertEquals(expected, results);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index b9ca30f48cc..e7a9226d89b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -50,8 +49,7 @@
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -78,28 +76,13 @@
     private KGroupedStream<String, String> groupedStream;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
 
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-stream-impl-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
     }
 
     @SuppressWarnings("deprecation")
@@ -224,13 +207,14 @@ public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() {
     }
 
     private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        }
         assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -298,13 +282,14 @@ public void apply(final Windowed<String> key, final Integer value) {
     }
 
     private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+        }
         assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -341,13 +326,14 @@ public void apply(final Windowed<String> key, final Long value) {
     }
 
     private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+        }
         assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
         assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
         assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -554,26 +540,30 @@ public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
     public void shouldCountAndMaterializeResults() {
         groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
+            final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
 
-        assertThat(count.get("1"), equalTo(3L));
-        assertThat(count.get("2"), equalTo(1L));
-        assertThat(count.get("3"), equalTo(2L));
+            assertThat(count.get("1"), equalTo(3L));
+            assertThat(count.get("2"), equalTo(1L));
+            assertThat(count.get("3"), equalTo(2L));
+        }
     }
 
     @Test
     public void shouldLogAndMeasureSkipsInAggregate() {
         groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        processData();
-        LogCaptureAppender.unregister(appender);
-
-        final Map<MetricName, ? extends Metric> metrics = driver.metrics();
-        assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
-        assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            LogCaptureAppender.unregister(appender);
+
+            final Map<MetricName, ? extends Metric> metrics = driver.metrics();
+            assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+            assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        }
     }
 
 
@@ -586,13 +576,15 @@ public void shouldReduceAndMaterializeResults() {
                 .withKeySerde(Serdes.String())
                 .withValueSerde(Serdes.String()));
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
+            final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
 
-        assertThat(reduced.get("1"), equalTo("A+C+D"));
-        assertThat(reduced.get("2"), equalTo("B"));
-        assertThat(reduced.get("3"), equalTo("E+F"));
+            assertThat(reduced.get("1"), equalTo("A+C+D"));
+            assertThat(reduced.get("2"), equalTo("B"));
+            assertThat(reduced.get("3"), equalTo("E+F"));
+        }
     }
 
     @Test
@@ -605,13 +597,15 @@ public void shouldLogAndMeasureSkipsInReduce() {
         );
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        processData();
-        LogCaptureAppender.unregister(appender);
-
-        final Map<MetricName, ? extends Metric> metrics = driver.metrics();
-        assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
-        assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            LogCaptureAppender.unregister(appender);
+
+            final Map<MetricName, ? extends Metric> metrics = driver.metrics();
+            assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+            assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+        }
     }
 
 
@@ -625,13 +619,15 @@ public void shouldAggregateAndMaterializeResults() {
                 .withKeySerde(Serdes.String())
                 .withValueSerde(Serdes.String()));
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
+            final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
 
-        assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
-        assertThat(aggregate.get("2"), equalTo("0+B"));
-        assertThat(aggregate.get("3"), equalTo("0+E+F"));
+            assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
+            assertThat(aggregate.get("2"), equalTo("0+B"));
+            assertThat(aggregate.get("3"), equalTo("0+E+F"));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -649,15 +645,16 @@ public void apply(final String key, final String value) {
                 }
             });
 
-        processData();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
 
-        assertThat(results.get("1"), equalTo("0+A+C+D"));
-        assertThat(results.get("2"), equalTo("0+B"));
-        assertThat(results.get("3"), equalTo("0+E+F"));
+            assertThat(results.get("1"), equalTo("0+A+C+D"));
+            assertThat(results.get("2"), equalTo("0+B"));
+            assertThat(results.get("3"), equalTo("0+E+F"));
+        }
     }
 
-    private void processData() {
-        driver = new TopologyTestDriver(builder.build(), props);
+    private void processData(final TopologyTestDriver driver) {
         driver.pipeInput(recordFactory.create(TOPIC, "1", "A"));
         driver.pipeInput(recordFactory.create(TOPIC, "2", "B"));
         driver.pipeInput(recordFactory.create(TOPIC, "1", "C"));
@@ -668,22 +665,23 @@ private void processData() {
     }
 
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
-        driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
-        driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
-        driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
+            driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+            driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+        }
         assertThat(results, equalTo(Arrays.asList(
-            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
-            KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
-            KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
-            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
-            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
-            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
-            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
         )));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 742f3496579..05d339f9ad9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
@@ -39,8 +38,7 @@
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -60,29 +58,13 @@
     private final StreamsBuilder builder = new StreamsBuilder();
     private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
     private final String topic = "input";
 
     @Before
     public void before() {
         groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
                 .groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
-
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-table-impl-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
     }
 
     @Test
@@ -140,30 +122,31 @@ public void shouldNotAllowNullStoreSupplierOnReduce() {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier<KeyValueStore>) null);
     }
 
-    private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
-        final Map<String, Integer> results = new HashMap<>();
-        final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
-        reduced.foreach(new ForeachAction<String, Integer>() {
+    private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
+        final Map<String, Integer> reducedResults = new HashMap<>();
+        inputKTable.foreach(new ForeachAction<String, Integer>() {
             @Override
             public void apply(final String key, final Integer value) {
-                results.put(key, value);
+                reducedResults.put(key, value);
             }
         });
-
-        driver = new TopologyTestDriver(builder.build(), props);
+        return reducedResults;
+    }
+    private void assertReduced(final Map<String, Integer> reducedResults, final String topic, final TopologyTestDriver driver) {
+        final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
         driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
 
-        assertEquals(Integer.valueOf(1), results.get("A"));
-        assertEquals(Integer.valueOf(2), results.get("B"));
+        assertEquals(Integer.valueOf(1), reducedResults.get("A"));
+        assertEquals(Integer.valueOf(2), reducedResults.get("B"));
 
         driver.pipeInput(recordFactory.create(topic, "A", 2.6, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 1.3, 10));
         driver.pipeInput(recordFactory.create(topic, "A", 5.7, 10));
         driver.pipeInput(recordFactory.create(topic, "B", 6.2, 10));
 
-        assertEquals(Integer.valueOf(5), results.get("A"));
-        assertEquals(Integer.valueOf(6), results.get("B"));
+        assertEquals(Integer.valueOf(5), reducedResults.get("A"));
+        assertEquals(Integer.valueOf(6), reducedResults.get("B"));
     }
 
     @Test
@@ -184,8 +167,11 @@ public void shouldReduce() {
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
 
-        doShouldReduce(reduced, topic);
-        assertEquals(reduced.queryableStoreName(), "reduced");
+        final Map<String, Integer> results = getReducedResults(reduced);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertReduced(results, topic, driver);
+            assertEquals(reduced.queryableStoreName(), "reduced");
+        }
     }
 
     @Test
@@ -206,8 +192,11 @@ public void shouldReduceWithInternalStoreName() {
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
 
-        doShouldReduce(reduced, topic);
-        assertNull(reduced.queryableStoreName());
+        final Map<String, Integer> results = getReducedResults(reduced);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertReduced(results, topic, driver);
+            assertNull(reduced.queryableStoreName());
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -229,10 +218,13 @@ public void shouldReduceAndMaterializeResults() {
                                 .withKeySerde(Serdes.String())
                                 .withValueSerde(Serdes.Integer()));
 
-        doShouldReduce(reduced, topic);
-        final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
-        assertThat(reduce.get("A"), equalTo(5));
-        assertThat(reduce.get("B"), equalTo(6));
+        final Map<String, Integer> results = getReducedResults(reduced);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            assertReduced(results, topic, driver);
+            final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
+            assertThat(reduce.get("A"), equalTo(5));
+            assertThat(reduce.get("B"), equalTo(6));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -246,10 +238,12 @@ public void shouldCountAndMaterializeResults() {
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()));
 
-        processData(topic);
-        final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
-        assertThat(counts.get("1"), equalTo(3L));
-        assertThat(counts.get("2"), equalTo(2L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(topic, driver);
+            final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
+            assertThat(counts.get("1"), equalTo(3L));
+            assertThat(counts.get("2"), equalTo(2L));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -266,10 +260,12 @@ public void shouldAggregateAndMaterializeResults() {
                                    .withValueSerde(Serdes.String())
                                    .withKeySerde(Serdes.String()));
 
-        processData(topic);
-        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
-        assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
-        assertThat(aggregate.get("2"), equalTo("0+2+2"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            processData(topic, driver);
+            final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
+            assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
+            assertThat(aggregate.get("2"), equalTo("0+2+2"));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -327,8 +323,7 @@ public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
                                (Materialized) null);
     }
 
-    private void processData(final String topic) {
-        driver = new TopologyTestDriver(builder.build(), props);
+    private void processData(final String topic, final TopologyTestDriver driver) {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         driver.pipeInput(recordFactory.create(topic, "A", "1"));
         driver.pipeInput(recordFactory.create(topic, "B", "1"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index bd3d60be8b5..2aa8aaceae0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -21,16 +21,13 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.List;
@@ -42,26 +39,7 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-branch-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @SuppressWarnings("unchecked")
     @Test
@@ -102,9 +80,10 @@ public boolean test(Integer key, String value) {
             branches[i].process(supplier);
         }
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         final List<MockProcessor<Integer, String>> processors = supplier.capturedProcessors(3);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index d338fe3f99b..51a994b6add 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -21,15 +21,12 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -40,28 +37,9 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-filter-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-  
-    private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
+    private final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
         public boolean test(Integer key, String value) {
             return (key % 3) == 0;
@@ -80,9 +58,10 @@ public void testFilter() {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filter(isMultipleOfThree).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(2, supplier.theCapturedProcessor().processed.size());
@@ -100,9 +79,10 @@ public void testFilterNot() {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filterNot(isMultipleOfThree).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(5, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 9ce24b556af..3173dcfca5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -22,15 +22,12 @@
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -42,26 +39,7 @@
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMap() {
@@ -88,9 +66,10 @@ public void testFlatMap() {
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.flatMap(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(6, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 221b02b22ef..471b1279b87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -20,16 +20,13 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -41,26 +38,7 @@
 
     private String topicName = "topic";
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
@@ -83,10 +61,11 @@ public void testFlatMapValues() {
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream.flatMapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (final int expectedKey : expectedKeys) {
-            // passing the timestamp to recordFactory.create to disambiguate the call
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (final int expectedKey : expectedKeys) {
+                // passing the timestamp to recordFactory.create to disambiguate the call
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+            }
         }
 
         String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
@@ -117,10 +96,11 @@ public void testFlatMapValuesWithKeys() {
 
         stream.flatMapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (final int expectedKey : expectedKeys) {
-            // passing the timestamp to recordFactory.create to disambiguate the call
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (final int expectedKey : expectedKeys) {
+                // passing the timestamp to recordFactory.create to disambiguate the call
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+            }
         }
 
         String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index b975c96e7d1..83a20a60346 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -17,20 +17,16 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -45,29 +41,7 @@
 
     private final String topicName = "topic";
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private Properties props = new Properties();
-
-    @Before
-    public void before() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-foreach-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testForeach() {
@@ -97,13 +71,14 @@ public void apply(Integer key, String value) {
 
         // When
         StreamsBuilder builder = new StreamsBuilder();
-        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.foreach(action);
 
         // Then
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (KeyValue<Integer, String> record: inputRecords) {
-            driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (KeyValue<Integer, String> record : inputRecords) {
+                driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+            }
         }
 
         assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 6e5b816f5ed..c37e8a954ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -17,22 +17,20 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,8 +45,6 @@
 
     private final String streamTopic = "streamTopic";
     private final String globalTableTopic = "globalTableTopic";
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
     private TopologyTestDriver driver;
     private MockProcessor<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
@@ -63,8 +59,8 @@ public void setUp() {
         final KeyValueMapper<Integer, String, String> keyMapper;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
-        final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+        final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
         table = builder.globalTable(globalTableTopic, tableConsumed);
         keyMapper = new KeyValueMapper<Integer, String, String>() {
@@ -78,13 +74,7 @@ public String apply(final Integer key, final String value) {
         };
         stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
@@ -92,10 +82,7 @@ public String apply(final Integer key, final String value) {
 
     @After
     public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
+        driver.close();
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index b3551baf961..eb0775a0847 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +30,8 @@
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,8 +45,6 @@
 
     final private String streamTopic = "streamTopic";
     final private String globalTableTopic = "globalTableTopic";
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
 
     private MockProcessor<Integer, String> processor;
     private TopologyTestDriver driver;
@@ -64,8 +61,8 @@ public void setUp() {
         final KeyValueMapper<Integer, String, String> keyMapper;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
-        final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+        final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
         table = builder.globalTable(globalTableTopic, tableConsumed);
         keyMapper = new KeyValueMapper<Integer, String, String>() {
@@ -79,18 +76,17 @@ public String apply(final Integer key, final String value) {
         };
         stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
     }
 
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
         final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 797575d8bb1..49e8aaae026 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -24,7 +23,6 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -49,8 +47,7 @@
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -69,48 +66,28 @@
 
 public class KStreamImplTest {
 
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
-
     private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
 
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Before
     public void before() {
         builder = new StreamsBuilder();
         testStream = builder.stream("source");
-
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-impl-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
     }
 
     @Test
     public void testNumProcesses() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
+        KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
 
-        KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
+        KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
 
         KStream<String, String> stream1 =
             source1.filter(new Predicate<String, String>() {
@@ -170,7 +147,7 @@ public boolean test(String key, Integer value) {
         );
 
         final int anyWindowSize = 1;
-        final Joined<String, Integer, Integer> joined = Joined.with(stringSerde, intSerde, intSerde);
+        final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
         KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
@@ -205,9 +182,8 @@ public Integer apply(Integer value1, Integer value2) {
     @Test
     public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-        KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
-        KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
+        KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
+        KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
 
         stream1.to("topic-5");
         stream2.through("topic-6");
@@ -224,11 +200,12 @@ public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
     public void shouldSendDataThroughTopicUsingProduced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
-        final KStream<String, String> stream = builder.stream(input, consumed);
-        stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
+        final KStream<String, String> stream = builder.stream(input, stringConsumed);
+        stream.through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(input, "a", "b"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(input, "a", "b"));
+        }
         assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b")));
     }
 
@@ -236,12 +213,13 @@ public void shouldSendDataThroughTopicUsingProduced() {
     public void shouldSendDataToTopicUsingProduced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
-        final KStream<String, String> stream = builder.stream(input, consumed);
-        stream.to("to-topic", Produced.with(stringSerde, stringSerde));
-        builder.stream("to-topic", consumed).process(processorSupplier);
+        final KStream<String, String> stream = builder.stream(input, stringConsumed);
+        stream.to("to-topic", Produced.with(Serdes.String(), Serdes.String()));
+        builder.stream("to-topic", stringConsumed).process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create(input, "e", "f"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(input, "e", "f"));
+        }
         assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f")));
     }
 
@@ -249,7 +227,7 @@ public void shouldSendDataToTopicUsingProduced() {
     // TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
     public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
         final KStreamBuilder builder = new KStreamBuilder();
-        KStream<String, String> kStream = builder.stream(stringSerde, stringSerde, "topic-1");
+        KStream<String, String> kStream = builder.stream(Serdes.String(), Serdes.String(), "topic-1");
         ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
         long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
         final KStream<String, String> stream = kStream
@@ -282,9 +260,8 @@ public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningT
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-        final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), consumed);
-        inputStream.to(stringSerde, null, "output");
+        final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), stringConsumed);
+        inputStream.to(Serdes.String(), null, "output");
     }
 
     @Test(expected = NullPointerException.class)
@@ -477,7 +454,7 @@ public void shouldThrowNullPointerOnToWhenProducedIsNull() {
 
     @Test
     public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
-        final KTable<String, String> table = builder.table("blah", consumed);
+        final KTable<String, String> table = builder.table("blah", stringConsumed);
         try {
             testStream.leftJoin(table,
                                 MockValueJoiner.TOSTRING_JOINER,
@@ -490,7 +467,7 @@ public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
 
     @Test
     public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
-        final KTable<String, String> table = builder.table("blah", consumed);
+        final KTable<String, String> table = builder.table("blah", stringConsumed);
         try {
             testStream.join(table,
                             MockValueJoiner.TOSTRING_JOINER,
@@ -522,12 +499,12 @@ public void shouldMergeTwoStreams() {
 
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+            driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+            driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
@@ -547,16 +524,16 @@ public void shouldMergeMultipleStreams() {
 
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
-        driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
-        driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
-        driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
-        driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
-        driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
-        driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
-        driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+            driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+            driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
+            driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
+            driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
+            driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
+            driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
+            driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
                      processorSupplier.theCapturedProcessor().processed);
@@ -568,13 +545,13 @@ public void shouldProcessFromSourceThatMatchPattern() {
 
         pattern2Source.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
-        driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
-        driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
-        driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
-        driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+            driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+            driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
+            driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
+            driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
                 processorSupplier.theCapturedProcessor().processed);
@@ -591,13 +568,13 @@ public void shouldProcessFromSourcesThatMatchMultiplePattern() {
 
         merged.process(processorSupplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
-        driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
-        driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
-        driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
-        driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+            driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+            driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
+            driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
+            driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+        }
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
                 processorSupplier.theCapturedProcessor().processed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 5d849eee5d4..de3446c1a08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -17,26 +17,22 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -55,38 +51,16 @@
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setUp() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
-        final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
         final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
 
         left.join(
@@ -98,17 +72,18 @@ public Integer apply(final Integer value1, final Integer value2) {
                 }
             },
             JoinWindows.of(100),
-            Joined.with(stringSerde, intSerde, intSerde)
+            Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
         );
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver = new TopologyTestDriver(builder.build(), props);
-        driver.pipeInput(recordFactory.create("left", "A", null));
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create("left", "A", null));
+            LogCaptureAppender.unregister(appender);
 
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
 
-        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        }
     }
 
     @Test
@@ -127,7 +102,7 @@ public void testJoin() {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -135,81 +110,82 @@ public void testJoin() {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        // push two items to the primary stream. the other window is empty
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = {}
+            // push two items to the primary stream. the other window is empty
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = {}
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all four items to the primary stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push all four items to the primary stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all items to the other stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all items to the other stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // push all four items to the primary stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
-        // push two items to the other stream. this should produce six item.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            // push two items to the other stream. this should produce six item.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+        }
     }
 
     @Test
@@ -229,88 +205,89 @@ public void testOuterJoin() {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        // push two items to the primary stream. the other window is empty.this should produce two items
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = {}
+            // push two items to the primary stream. the other window is empty.this should produce two items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = {}
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // push all four items to the primary stream. this should produce four items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push all four items to the primary stream. this should produce four items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
-        // push all items to the other stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all items to the other stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // push all four items to the primary stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
-        }
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
-        // push two items to the other stream. this should produce six item.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            // push two items to the other stream. this should produce six item.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+        }
     }
 
     @Test
@@ -332,7 +309,7 @@ public void testWindowing() {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -340,197 +317,198 @@ public void testWindowing() {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
 
-        // push two items to the primary stream. the other window is empty. this should produce no items.
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = {}
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
-        }
+            // push two items to the primary stream. the other window is empty. this should produce no items.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = {}
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+            }
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        //     w2 = { 0:Y0, 1:Y1 }
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            //     w2 = { 0:Y0, 1:Y1 }
 
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
-        }
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        // clear logically
-        time = 1000L;
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
+            // clear logically
+            time = 1000L;
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
 
-        // gradually expires items in w1
-        // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+            // gradually expires items in w1
+            // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
 
-        time += 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("3:X3+YY3");
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // go back to the time before expiration
+            // go back to the time before expiration
 
-        time = 1000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time = 1000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0");
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        time += 1;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        time += 1;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            time += 1;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        // clear (logically)
-        time = 2000L;
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
-        }
+            // clear (logically)
+            time = 2000L;
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // gradually expires items in w2
-        // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+            // gradually expires items in w2
+            // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
-        time = 2000L + 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time = 2000L + 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("3:XX3+Y3");
+            processor.checkAndClearProcessResult("3:XX3+Y3");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        // go back to the time before expiration
+            // go back to the time before expiration
 
-        time = 2000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time = 2000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult();
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0");
+            processor.checkAndClearProcessResult("0:XX0+Y0");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
 
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+        }
     }
 
     @Test
@@ -552,9 +530,9 @@ public void testAsymmetricWindowingAfter() {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(0).after(100),
-            Joined.with(intSerde,
-                stringSerde,
-                stringSerde));
+            Joined.with(Serdes.Integer(),
+                Serdes.String(),
+                Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -562,85 +540,85 @@ public void testAsymmetricWindowingAfter() {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
 
+            time = 1000L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult();
 
-        processor.checkAndClearProcessResult();
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
-        processor.checkAndClearProcessResult("0:X0+YY0");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time = 1000 + 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000 + 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
-        processor.checkAndClearProcessResult("3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            processor.checkAndClearProcessResult();
         }
-
-        processor.checkAndClearProcessResult();
     }
 
     @Test
@@ -663,7 +641,7 @@ public void testAsymmetricWindowingBefore() {
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(0).before(100),
-            Joined.with(intSerde, stringSerde, stringSerde));
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -671,84 +649,84 @@ public void testAsymmetricWindowingBefore() {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
 
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
 
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
 
+            time = 1000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult();
 
-        processor.checkAndClearProcessResult();
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
-        processor.checkAndClearProcessResult("0:X0+YY0");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time = 1000L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time = 1000L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
-        processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
-        }
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
-        processor.checkAndClearProcessResult("3:X3+YY3");
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            }
 
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+            processor.checkAndClearProcessResult();
         }
-
-        processor.checkAndClearProcessResult();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index c67e13df223..11c5c5b9852 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -17,24 +17,20 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -50,30 +46,9 @@
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private Properties props = new Properties();
-
-    @Before
-    public void setUp() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-left-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testLeftJoin() {
@@ -91,7 +66,7 @@ public void testLeftJoin() {
         joined = stream1.leftJoin(stream2,
                                   MockValueJoiner.TOSTRING_JOINER,
                                   JoinWindows.of(100),
-                                  Joined.with(intSerde, stringSerde, stringSerde));
+                                  Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -99,65 +74,66 @@ public void testLeftJoin() {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
-        // push two items to the primary stream. the other window is empty
-        // w1 {}
-        // w2 {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = {}
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
-        }
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
-        // push two items to the other stream. this should produce two items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = { 0:Y0, 1:Y1 }
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
-        }
-
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-
-        // push three items to the primary stream. this should produce four items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // --> w2 = { 0:Y0, 1:Y1 }
-
-        for (int i = 0; i < 3; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            // push two items to the primary stream. the other window is empty
+            // w1 {}
+            // w2 {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce two items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+            }
+
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+            // push three items to the primary stream. this should produce four items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // --> w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 3; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+            }
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
+
+            // push all items to the other stream. this should produce 5 items
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // w2 = { 0:Y0, 1:Y1 }
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+            }
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
+
+            // push all four items to the primary stream. this should produce six items.
+            // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+            // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+            // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+            // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
         }
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
-
-        // push all items to the other stream. this should produce 5 items
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // w2 = { 0:Y0, 1:Y1 }
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
-        }
-        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
-
-        // push all four items to the primary stream. this should produce six items.
-        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
-        // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-        // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
     }
 
     @Test
@@ -176,7 +152,7 @@ public void testWindowing() {
         joined = stream1.leftJoin(stream2,
                                   MockValueJoiner.TOSTRING_JOINER,
                                   JoinWindows.of(100),
-                                  Joined.with(intSerde, stringSerde, stringSerde));
+                                  Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -184,111 +160,112 @@ public void testWindowing() {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver = new TopologyTestDriver(builder.build(), props, time);
-
-        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
-        // push two items to the primary stream. the other window is empty. this should produce two items
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = {}
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
-        }
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
-        // push two items to the other stream. this should produce no items.
-        // w1 = { 0:X0, 1:X1 }
-        // w2 = {}
-        // --> w1 = { 0:X0, 1:X1 }
-        // --> w2 = { 0:Y0, 1:Y1 }
-
-        for (int i = 0; i < 2; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
-        }
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-
-        // clear logically
-        time = 1000L;
-
-        // push all items to the other stream. this should produce no items.
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = {}
-        // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
-        }
-        processor.checkAndClearProcessResult();
-
-        // gradually expire items in window 2.
-        // w1 = {}
-        // w2 = {}
-        // --> w1 = {}
-        // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-
-        time = 1000L + 100L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
-        // go back to the time before expiration
-
-        time = 1000L - 100L - 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
-        }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
-
-        time += 1L;
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
+
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            // push two items to the primary stream. the other window is empty. this should produce two items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+            }
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce no items.
+            // w1 = { 0:X0, 1:X1 }
+            // w2 = {}
+            // --> w1 = { 0:X0, 1:X1 }
+            // --> w2 = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+            }
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+            // clear logically
+            time = 1000L;
+
+            // push all items to the other stream. this should produce no items.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = {}
+            // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+            }
+            processor.checkAndClearProcessResult();
+
+            // gradually expire items in window 2.
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = {}
+            // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+
+            time = 1000L + 100L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            // go back to the time before expiration
+
+            time = 1000L - 100L - 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+
+            time += 1L;
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+            }
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
         }
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index ec31b5a1d60..0ce27ab51cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -17,22 +17,21 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,8 +51,6 @@
     private final String streamTopic = "streamTopic";
     private final String tableTopic = "tableTopic";
 
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
 
     private final int[] expectedKeys = {0, 1, 2, 3};
@@ -70,23 +67,22 @@ public void setUp() {
         final KTable<Integer, String> table;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
     }
 
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
     private void pushToStream(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
             driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 735f71c2ef0..eedda074a41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -31,7 +29,8 @@
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -48,8 +47,6 @@
     final private String streamTopic = "streamTopic";
     final private String tableTopic = "tableTopic";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
     private TopologyTestDriver driver;
     private MockProcessor<Integer, String> processor;
@@ -66,23 +63,22 @@ public void setUp() {
         final KTable<Integer, String> table;
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+        final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
         processor = supplier.theCapturedProcessor();
     }
 
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
     private void pushToStream(final int messageCount, final String valuePrefix) {
         for (int i = 0; i < messageCount; i++) {
             driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index b0a383ba23a..b55d8e1b8e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -41,30 +37,8 @@
 public class KStreamMapTest {
 
     private String topicName = "topic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMap() {
@@ -80,15 +54,14 @@ public void testMap() {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
-        MockProcessorSupplier<String, Integer> supplier;
-
-        supplier = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.map(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+            }
         }
 
         assertEquals(4, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index ed110383001..95593aad467 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -41,33 +37,9 @@
 public class KStreamMapValuesTest {
 
     private String topicName = "topic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
-
-
+    private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testFlatMapValues() {
@@ -83,13 +55,13 @@ public Integer apply(CharSequence value) {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        KStream<Integer, String> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.mapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+            }
         }
         String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
 
@@ -110,13 +82,13 @@ public Integer apply(final Integer readOnlyKey, final CharSequence value) {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        KStream<Integer, String> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.mapValues(mapper).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+            }
         }
         String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 2c6ff81b8f6..137aa6f473f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -17,20 +17,16 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -43,53 +39,33 @@
 public class KStreamPeekTest {
 
     private final String topicName = "topic";
-    private final Serde<Integer> intSerd = Serdes.Integer();
-    private final Serde<String> stringSerd = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-peek-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void shouldObserveStreamElements() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
+        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
 
-        driver = new TopologyTestDriver(builder.build(), props);
-        final List<KeyValue<Integer, String>> expected = new ArrayList<>();
-        for (int key = 0; key < 32; key++) {
-            final String value = "V" + key;
-            driver.pipeInput(recordFactory.create(topicName, key, value));
-            expected.add(new KeyValue<>(key, value));
-        }
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final List<KeyValue<Integer, String>> expected = new ArrayList<>();
+            for (int key = 0; key < 32; key++) {
+                final String value = "V" + key;
+                driver.pipeInput(recordFactory.create(topicName, key, value));
+                expected.add(new KeyValue<>(key, value));
+            }
 
-        assertEquals(expected, peekObserved);
-        assertEquals(expected, streamObserved);
+            assertEquals(expected, peekObserved);
+            assertEquals(expected, streamObserved);
+        }
     }
 
     @Test
     public void shouldNotAllowNullAction() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
+        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         try {
             stream.peek(null);
             fail("expected null action to throw NPE");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 1abc0b99be6..b030233fa82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -44,29 +40,8 @@
 
     private String topicName = "topic_key_select";
 
-    final private Serde<Integer> integerSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-select-key-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
 
     @Test
     public void testSelectKey() {
@@ -88,16 +63,16 @@ public String apply(Object key, Number value) {
         final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
         final int[] expectedValues = new int[]{1, 2, 3};
 
-        KStream<String, Integer>  stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
+        KStream<String, Integer>  stream = builder.stream(topicName, Consumed.with(Serdes.String(), Serdes.Integer()));
 
         MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
         stream.selectKey(selector).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (int expectedValue : expectedValues) {
-            driver.pipeInput(recordFactory.create(expectedValue));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (int expectedValue : expectedValues) {
+                driver.pipeInput(recordFactory.create(expectedValue));
+            }
         }
 
         assertEquals(3, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 1567fe1657c..8a05aac7fa9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -17,12 +17,10 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
@@ -33,9 +31,7 @@
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -47,33 +43,12 @@
 
     private String topicName = "topic";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
 
     @Rule
     public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
 
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
     @Test
     public void testTransform() {
         StreamsBuilder builder = new StreamsBuilder();
@@ -102,7 +77,7 @@ public void close() {}
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
         kstreamDriver.setUp(builder);
@@ -161,18 +136,19 @@ public void close() {}
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
-        }
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+            }
 
-        // This tick will yield yields the "-1:2" result
-        driver.advanceWallClockTime(2);
-        // This tick further advances the clock to 3, which leads to the "-1:3" result
-        driver.advanceWallClockTime(1);
+            // This tick will yield yields the "-1:2" result
+            driver.advanceWallClockTime(2);
+            // This tick further advances the clock to 3, which leads to the "-1:3" result
+            driver.advanceWallClockTime(1);
+        }
 
         assertEquals(6, processor.theCapturedProcessor().processed.size());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 6bfc813077e..419e6f1333f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -17,11 +17,9 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
@@ -34,9 +32,7 @@
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -47,31 +43,9 @@
 public class KStreamTransformValuesTest {
 
     private String topicName = "topic";
-
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
-
+    private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
 
     @Test
     public void testTransform() {
@@ -109,13 +83,13 @@ public void close() {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, Integer> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transformValues(valueTransformerSupplier).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+            }
         }
         String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
 
@@ -152,13 +126,13 @@ public void close() {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, Integer> stream;
-        stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transformValues(valueTransformerSupplier).process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (int expectedKey : expectedKeys) {
-            driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            for (int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+            }
         }
         String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 9050edb1941..7a2a8e0958b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -39,9 +37,7 @@
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.List;
@@ -54,28 +50,8 @@
 
 public class KStreamWindowAggregateTest {
 
-    final private Serde<String> strSerde = Serdes.String();
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-window-aggregate-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testAggBasic() {
@@ -83,31 +59,31 @@ public void testAggBasic() {
         final String topic1 = "topic1";
 
         final KTable<Windowed<String>, String> table2 = builder
-            .stream(topic1, Consumed.with(strSerde, strSerde))
-            .groupByKey(Serialized.with(strSerde, strSerde))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic1-Canonized");
 
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+        }
 
 
         assertEquals(
@@ -141,16 +117,16 @@ public void testJoin() {
         final String topic2 = "topic2";
 
         final KTable<Windowed<String>, String> table1 = builder
-            .stream(topic1, Consumed.with(strSerde, strSerde))
-            .groupByKey(Serialized.with(strSerde, strSerde))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic1-Canonized");
 
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         final KTable<Windowed<String>, String> table2 = builder
-            .stream(topic2, Consumed.with(strSerde, strSerde)).groupByKey(Serialized.with(strSerde, strSerde))
-            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic2-Canonized");
+            .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic2-Canonized");
 
         table2.toStream().process(supplier);
 
@@ -162,84 +138,84 @@ public String apply(final String p1, final String p2) {
             }
         }).toStream().process(supplier);
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
-
-        final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
-
-        processors.get(0).checkAndClearProcessResult(
-            "[A@0/10]:0+1",
-            "[B@0/10]:0+2",
-            "[C@0/10]:0+3",
-            "[D@0/10]:0+4",
-            "[A@0/10]:0+1+1"
-        );
-        processors.get(1).checkAndClearProcessResult();
-        processors.get(2).checkAndClearProcessResult();
-
-        driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
-        driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
-        driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
-        driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
-
-        processors.get(0).checkAndClearProcessResult(
-            "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
-            "[B@0/10]:0+2+2", "[B@5/15]:0+2",
-            "[D@0/10]:0+4+4", "[D@5/15]:0+4",
-            "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
-            "[C@0/10]:0+3+3", "[C@5/15]:0+3"
-        );
-        processors.get(1).checkAndClearProcessResult();
-        processors.get(2).checkAndClearProcessResult();
-
-        driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
-        driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
-        driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
-        driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
-        driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
-
-        processors.get(0).checkAndClearProcessResult();
-        processors.get(1).checkAndClearProcessResult(
-            "[A@0/10]:0+a",
-            "[B@0/10]:0+b",
-            "[C@0/10]:0+c",
-            "[D@0/10]:0+d",
-            "[A@0/10]:0+a+a"
-        );
-        processors.get(2).checkAndClearProcessResult(
-            "[A@0/10]:0+1+1+1%0+a",
-            "[B@0/10]:0+2+2+2%0+b",
-            "[C@0/10]:0+3+3%0+c",
-            "[D@0/10]:0+4+4%0+d",
-            "[A@0/10]:0+1+1+1%0+a+a");
-
-        driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
-        driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
-        driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
-        driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
-        driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
-
-        processors.get(0).checkAndClearProcessResult();
-        processors.get(1).checkAndClearProcessResult(
-            "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
-            "[B@0/10]:0+b+b", "[B@5/15]:0+b",
-            "[D@0/10]:0+d+d", "[D@5/15]:0+d",
-            "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
-            "[C@0/10]:0+c+c", "[C@5/15]:0+c"
-        );
-        processors.get(2).checkAndClearProcessResult(
-            "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
-            "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
-            "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
-            "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
-            "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
-        );
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+            final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+            processors.get(0).checkAndClearProcessResult(
+                    "[A@0/10]:0+1",
+                    "[B@0/10]:0+2",
+                    "[C@0/10]:0+3",
+                    "[D@0/10]:0+4",
+                    "[A@0/10]:0+1+1"
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+
+            processors.get(0).checkAndClearProcessResult(
+                    "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+                    "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+                    "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+                    "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+                    "[C@0/10]:0+3+3", "[C@5/15]:0+3"
+            );
+            processors.get(1).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+
+            driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
+            driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
+            driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
+            driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
+            driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    "[A@0/10]:0+a",
+                    "[B@0/10]:0+b",
+                    "[C@0/10]:0+c",
+                    "[D@0/10]:0+d",
+                    "[A@0/10]:0+a+a"
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    "[A@0/10]:0+1+1+1%0+a",
+                    "[B@0/10]:0+2+2+2%0+b",
+                    "[C@0/10]:0+3+3%0+c",
+                    "[D@0/10]:0+4+4%0+d",
+                    "[A@0/10]:0+1+1+1%0+a+a");
+
+            driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
+            driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
+            driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
+            driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
+            driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
+
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(1).checkAndClearProcessResult(
+                    "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+                    "[B@0/10]:0+b+b", "[B@5/15]:0+b",
+                    "[D@0/10]:0+d+d", "[D@5/15]:0+d",
+                    "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+                    "[C@0/10]:0+c+c", "[C@5/15]:0+c"
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+                    "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
+                    "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
+                    "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+                    "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
+            );
+        }
     }
 
     @Test
@@ -247,22 +223,22 @@ public void shouldLogAndMeterWhenSkippingNullKey() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
 
-        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(strSerde, strSerde));
-        stream1.groupByKey(Serialized.with(strSerde, strSerde))
+        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(10).advanceBy(5))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.<String, String>toStringInstance("+"),
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String())
             );
 
-        driver = new TopologyTestDriver(builder.build(), props, 0L);
-
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        driver.pipeInput(recordFactory.create(topic, null, "1"));
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic, null, "1"));
+            LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+            assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 30c0a7a51c3..a6b6c649299 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -17,23 +17,19 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -48,29 +44,8 @@
 public class KTableForeachTest {
 
     final private String topicName = "topic";
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private final Properties props = new Properties();
-
-    @Before
-    public void setup() {
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-foreach-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
-        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-    }
-
-    @After
-    public void cleanup() {
-        props.clear();
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
+    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testForeach() {
@@ -101,17 +76,17 @@ public void apply(Integer key, String value) {
         // When
         StreamsBuilder builder = new StreamsBuilder();
         KTable<Integer, String> table = builder.table(topicName,
-                                                      Consumed.with(intSerde, stringSerde),
+                                                      Consumed.with(Serdes.Integer(), Serdes.String()),
                                                       Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
-                                                              .withKeySerde(intSerde)
-                                                              .withValueSerde(stringSerde));
+                                                              .withKeySerde(Serdes.Integer())
+                                                              .withValueSerde(Serdes.String()));
         table.foreach(action);
 
         // Then
-        driver = new TopologyTestDriver(builder.build(), props);
-
-        for (KeyValue<Integer, String> record: inputRecords) {
-            driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            for (KeyValue<Integer, String> record : inputRecords) {
+                driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+            }
         }
 
         assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index cf4460dfdd0..bcb98566a6d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -44,6 +44,11 @@
 import java.util.Set;
 import java.util.regex.Pattern;
 
+/**
+ * KStreamTestDriver
+ *
+ * @deprecated please use {@link org.apache.kafka.streams.TopologyTestDriver} instead
+ */
 @Deprecated
 public class KStreamTestDriver extends ExternalResource {
 
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index a19b55ce451..940651915f2 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -58,6 +59,28 @@ public static Properties getStreamsConfig(final String applicationId,
 
     }
 
+    public static Properties topologyTestConfig(final String applicationId,
+                                                final String bootstrapServers,
+                                                final String keyDeserializer,
+                                                final String valueDeserializer) {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keyDeserializer);
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueDeserializer);
+        return props;
+    }
+
+    public static Properties topologyTestConfig(final Serde keyDeserializer,
+                                                final Serde valueDeserializer) {
+        return topologyTestConfig(
+                UUID.randomUUID().toString(),
+                "localhost:9091",
+                keyDeserializer.getClass().getName(),
+                valueDeserializer.getClass().getName());
+    }
+
     public static Properties minimalStreamsConfig() {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f6bbc4b5c27..7b8bdd5664d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -226,7 +226,7 @@ public TopologyTestDriver(final Topology topology,
      * @param builder builder for the topology to be tested
      * @param config the configuration for the topology
      */
-    protected TopologyTestDriver(final InternalTopologyBuilder builder,
+    TopologyTestDriver(final InternalTopologyBuilder builder,
                               final Properties config) {
         this(builder, config,  System.currentTimeMillis());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Rewrite test to use new public TopologyTestDriver
> -------------------------------------------------
>
>                 Key: KAFKA-6474
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6474
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams, unit tests
>    Affects Versions: 1.1.0
>            Reporter: Matthias J. Sax
>            Assignee: Filipe Agapito
>            Priority: Major
>              Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own test to use this new test driver and remove the two classes ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)