You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/22 13:39:27 UTC

[kafka] branch 2.7 updated: MINOR: TopologyTestDriver should not require dummy parameters (#9477)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new d25c4fd  MINOR: TopologyTestDriver should not require dummy parameters (#9477)
d25c4fd is described below

commit d25c4fd7e624219feff4e78a5ea96f8f8dc2bff0
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Oct 22 08:19:01 2020 -0500

    MINOR: TopologyTestDriver should not require dummy parameters (#9477)
    
    TopologyTestDriver comes with a paper cut that it passes through a
    config requirement that application.id and bootstrap.servers must be
    configured. But these configs are not required in the context of
    TopologyTestDriver specifically. This change relaxes the requirement.
    
    Reviewers: Boyang Chen <bo...@apache.org>, Matthias J. Sax <mj...@apache.org>
---
 .../examples/docs/DeveloperGuideTesting.java       |  1 -
 .../org/apache/kafka/streams/TopologyTest.java     |  1 -
 .../kstream/internals/AbstractStreamTest.java      |  3 +-
 .../kstream/internals/KStreamTransformTest.java    | 11 ++-----
 .../kstream/internals/KTableAggregateTest.java     | 10 +-----
 .../KTableKTableForeignKeyJoinScenarioTest.java    |  2 --
 .../internals/KTableTransformValuesTest.java       |  1 -
 .../kafka/streams/scala/utils/TestDriver.scala     |  7 ++--
 .../apache/kafka/streams/TopologyTestDriver.java   | 10 +++++-
 .../streams/processor/MockProcessorContext.java    | 12 +++++--
 .../processor/api/MockProcessorContext.java        | 10 +++++-
 .../kafka/streams/MockProcessorContextTest.java    |  2 --
 .../org/apache/kafka/streams/TestTopicsTest.java   |  9 +----
 .../kafka/streams/TopologyTestDriverTest.java      | 38 ++++++++++------------
 .../wordcount/WindowedWordCountProcessorTest.java  |  7 +---
 15 files changed, 55 insertions(+), 69 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index b48c803..333d309 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -73,7 +73,6 @@ public class DeveloperGuideTesting {
         // setup test driver
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
         props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
         testDriver = new TopologyTestDriver(topology, props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 9ecab31..2dd1ee2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -340,7 +340,6 @@ public class TopologyTest {
         final String badNodeName = "badGuy";
 
         final Properties config = new Properties();
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         mockStoreBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 6577deb..f71e680 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
@@ -31,7 +32,6 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -85,7 +85,6 @@ public class AbstractStreamTest {
 
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 08bd5e8..2b2b9ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -39,9 +38,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
 
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamTransformTest {
@@ -83,10 +79,7 @@ public class KStreamTransformTest {
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(
             builder.build(),
-            mkProperties(mkMap(
-                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
-                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test")
-            )),
+            new Properties(),
             Instant.ofEpochMilli(0L))) {
             final TestInputTopic<Integer, Integer> inputTopic =
                 driver.createInputTopic(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 15ef700..0e86b3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -24,13 +24,13 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
@@ -77,8 +77,6 @@ public class KTableAggregateTest {
             final TopologyTestDriver driver = new TopologyTestDriver(
                 builder.build(),
                 mkProperties(mkMap(
-                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
-                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
                     mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
                 )),
                 Instant.ofEpochMilli(0L))) {
@@ -144,8 +142,6 @@ public class KTableAggregateTest {
             final TopologyTestDriver driver = new TopologyTestDriver(
                 builder.build(),
                 mkProperties(mkMap(
-                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
-                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
                     mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
                 )),
                 Instant.ofEpochMilli(0L))) {
@@ -182,8 +178,6 @@ public class KTableAggregateTest {
             final TopologyTestDriver driver = new TopologyTestDriver(
                 builder.build(),
                 mkProperties(mkMap(
-                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
-                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
                     mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
                 )),
                 Instant.ofEpochMilli(0L))) {
@@ -265,8 +259,6 @@ public class KTableAggregateTest {
             final TopologyTestDriver driver = new TopologyTestDriver(
                 builder.build(),
                 mkProperties(mkMap(
-                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
-                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
                     mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
                 )),
                 Instant.ofEpochMilli(0L))) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index eb5a4cd..d037193 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -183,7 +183,6 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         final String applicationId = "ktable-ktable-joinOnForeignKey";
         final Properties streamsConfig = mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
-            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
         ));
 
@@ -243,7 +242,6 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         final Properties config = new Properties();
         final String safeTestName = safeUniqueTestName(getClass(), testName);
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
-        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
         config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index f448e31..d66f6be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -472,7 +472,6 @@ public class KTableTransformValuesTest {
     public static Properties props() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
index fde1af1..87b86cc 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
@@ -19,20 +19,19 @@
 package org.apache.kafka.streams.scala.utils
 
 import java.time.Instant
-import java.util.{Properties, UUID}
+import java.util.Properties
 
 import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.streams.scala.StreamsBuilder
 import org.apache.kafka.streams.{StreamsConfig, TestInputTopic, TestOutputTopic, TopologyTestDriver}
+import org.apache.kafka.test.TestUtils
 import org.scalatest.Suite
 
 trait TestDriver { this: Suite =>
 
   def createTestDriver(builder: StreamsBuilder, initialWallClockTime: Instant = Instant.now()): TopologyTestDriver = {
     val config = new Properties()
-    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
-    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
-    config.put(StreamsConfig.STATE_DIR_CONFIG, s"out/state-store-${UUID.randomUUID()}")
+    config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath)
     new TopologyTestDriver(builder.build(), config, initialWallClockTime)
   }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 1b17f67..3078fc9 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -297,7 +297,15 @@ public class TopologyTestDriver implements Closeable {
     private TopologyTestDriver(final InternalTopologyBuilder builder,
                                final Properties config,
                                final long initialWallClockTimeMs) {
-        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        final Properties configCopy = new Properties();
+        configCopy.putAll(config);
+        if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+            configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
+        }
+        if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+            configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-topology-test-driver-app-id");
+        }
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
         logIfTaskIdleEnabled(streamsConfig);
 
         logContext = new LogContext("topology-test-driver ");
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 88e1660..ac44b74 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -31,9 +31,9 @@ import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
-import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 
 import java.io.File;
@@ -218,7 +218,15 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      */
     @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
-        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        final Properties configCopy = new Properties();
+        configCopy.putAll(config);
+        if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+            configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
+        }
+        if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+            configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-mock-app-id");
+        }
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
         this.taskId = taskId;
         this.config = streamsConfig;
         this.stateDir = stateDir;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index e96bdcd..5a537e1 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -240,7 +240,15 @@ public class MockProcessorContext<KForward, VForward> implements ProcessorContex
      * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
      */
     public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
-        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        final Properties configCopy = new Properties();
+        configCopy.putAll(config);
+        if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+            configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
+        }
+        if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+            configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-mock-app-id");
+        }
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
         this.taskId = taskId;
         this.config = streamsConfig;
         this.stateDir = stateDir;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 85d0a8b..514b983 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -267,7 +267,6 @@ public class MockProcessorContextTest {
     public void shouldCaptureApplicationAndRecordMetadata() {
         final Properties config = new Properties();
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
 
         final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
             @Override
@@ -389,7 +388,6 @@ public class MockProcessorContextTest {
     public void fullConstructorShouldSetAllExpectedAttributes() {
         final Properties config = new Properties();
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index 3f1b8a4..463bd3f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -44,9 +44,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
 
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItems;
@@ -68,10 +65,6 @@ public class TestTopicsTest {
     private final Serde<String> stringSerde = new Serdes.StringSerde();
     private final Serde<Long> longSerde = new Serdes.LongSerde();
 
-    private final Properties config = mkProperties(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "TestTopicsTest"),
-            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
-    ));
     private final Instant testBaseTime = Instant.parse("2019-06-01T10:00:00Z");
 
     @Before
@@ -82,7 +75,7 @@ public class TestTopicsTest {
         final KStream<Long, String> source = builder.stream(INPUT_TOPIC_MAP, Consumed.with(longSerde, stringSerde));
         final KStream<String, Long> mapped = source.map((key, value) -> new KeyValue<>(value, key));
         mapped.to(OUTPUT_TOPIC_MAP, Produced.with(stringSerde, longSerde));
-        testDriver = new TopologyTestDriver(builder.build(), config);
+        testDriver = new TopologyTestDriver(builder.build(), new Properties());
     }
 
     @After
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 8cf38c9..4c4858c 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -124,11 +124,7 @@ public class TopologyTestDriverTest {
         consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
 
     private TopologyTestDriver testDriver;
-    private final Properties config = mkProperties(mkMap(
-        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
-        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
-        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
-    ));
+    private final Properties config;
     private KeyValueStore<String, Long> store;
 
     private final StringDeserializer stringDeserializer = new StringDeserializer();
@@ -145,7 +141,16 @@ public class TopologyTestDriverTest {
 
     public TopologyTestDriverTest(final boolean eosEnabled) {
         if (eosEnabled) {
-            config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+            config = mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
+                mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
+            ));
+        } else {
+            config = mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
+            ));
         }
     }
 
@@ -434,6 +439,11 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldNotRequireParameters() {
+        new TopologyTestDriver(setupSingleProcessorTopology(), new Properties());
+    }
+
+    @Test
     public void shouldInitProcessor() {
         testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
         assertTrue(mockProcessors.get(0).initialized);
@@ -1527,7 +1537,6 @@ public class TopologyTestDriverTest {
 
         final Properties config = new Properties();
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver-cleanup");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
@@ -1685,10 +1694,6 @@ public class TopologyTestDriverTest {
 
     @Test
     public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
-        final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
         final Topology topology = new Topology();
         topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
         topology.addProcessor(
@@ -1715,7 +1720,7 @@ public class TopologyTestDriverTest {
         topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
         topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
 
-        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
             final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
 
@@ -1737,10 +1742,6 @@ public class TopologyTestDriverTest {
 
     @Test
     public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
-        final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
         final Topology topology = new Topology();
         topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
         topology.addGlobalStore(
@@ -1791,7 +1792,7 @@ public class TopologyTestDriverTest {
         topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
         topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
 
-        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
             final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
             final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
 
@@ -1817,9 +1818,6 @@ public class TopologyTestDriverTest {
     @Test
     public void shouldRespectTaskIdling() {
         final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
         // This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver,
         // so with an idle time specified, TTD can't just expect all enqueued records to be processable.
         properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000");
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
index a61ead4..608cfea 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.test.wordcount;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
@@ -89,15 +88,11 @@ public class WindowedWordCountProcessorTest {
 
     @Test
     public void shouldWorkWithPersistentStore() throws IOException {
-        final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
-
         final File stateDir = TestUtils.tempDirectory();
 
         try {
             final MockProcessorContext<String, String> context = new MockProcessorContext<>(
-                properties,
+                new Properties(),
                 new TaskId(0, 0),
                 stateDir
             );