You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/22 13:39:27 UTC
[kafka] branch 2.7 updated: MINOR: TopologyTestDriver should not
require dummy parameters (#9477)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new d25c4fd MINOR: TopologyTestDriver should not require dummy parameters (#9477)
d25c4fd is described below
commit d25c4fd7e624219feff4e78a5ea96f8f8dc2bff0
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Oct 22 08:19:01 2020 -0500
MINOR: TopologyTestDriver should not require dummy parameters (#9477)
TopologyTestDriver comes with a paper cut that it passes through a
config requirement that application.id and bootstrap.servers must be
configured. But these configs are not required in the context of
TopologyTestDriver specifically. This change relaxes the requirement.
Reviewers: Boyang Chen <bo...@apache.org>, Matthias J. Sax <mj...@apache.org>
---
.../examples/docs/DeveloperGuideTesting.java | 1 -
.../org/apache/kafka/streams/TopologyTest.java | 1 -
.../kstream/internals/AbstractStreamTest.java | 3 +-
.../kstream/internals/KStreamTransformTest.java | 11 ++-----
.../kstream/internals/KTableAggregateTest.java | 10 +-----
.../KTableKTableForeignKeyJoinScenarioTest.java | 2 --
.../internals/KTableTransformValuesTest.java | 1 -
.../kafka/streams/scala/utils/TestDriver.scala | 7 ++--
.../apache/kafka/streams/TopologyTestDriver.java | 10 +++++-
.../streams/processor/MockProcessorContext.java | 12 +++++--
.../processor/api/MockProcessorContext.java | 10 +++++-
.../kafka/streams/MockProcessorContextTest.java | 2 --
.../org/apache/kafka/streams/TestTopicsTest.java | 9 +----
.../kafka/streams/TopologyTestDriverTest.java | 38 ++++++++++------------
.../wordcount/WindowedWordCountProcessorTest.java | 7 +---
15 files changed, 55 insertions(+), 69 deletions(-)
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index b48c803..333d309 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -73,7 +73,6 @@ public class DeveloperGuideTesting {
// setup test driver
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
testDriver = new TopologyTestDriver(topology, props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 9ecab31..2dd1ee2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -340,7 +340,6 @@ public class TopologyTest {
final String badNodeName = "badGuy";
final Properties config = new Properties();
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
mockStoreBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 6577deb..f71e680 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
@@ -31,7 +32,6 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -85,7 +85,6 @@ public class AbstractStreamTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 08bd5e8..2b2b9ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
@@ -39,9 +38,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.junit.Assert.assertEquals;
public class KStreamTransformTest {
@@ -83,10 +79,7 @@ public class KStreamTransformTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
- mkProperties(mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test")
- )),
+ new Properties(),
Instant.ofEpochMilli(0L))) {
final TestInputTopic<Integer, Integer> inputTopic =
driver.createInputTopic(TOPIC_NAME, new IntegerSerializer(), new IntegerSerializer());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 15ef700..0e86b3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -24,13 +24,13 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
@@ -77,8 +77,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
@@ -144,8 +142,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
@@ -182,8 +178,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
@@ -265,8 +259,6 @@ public class KTableAggregateTest {
final TopologyTestDriver driver = new TopologyTestDriver(
builder.build(),
mkProperties(mkMap(
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())
)),
Instant.ofEpochMilli(0L))) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index eb5a4cd..d037193 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -183,7 +183,6 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final String applicationId = "ktable-ktable-joinOnForeignKey";
final Properties streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));
@@ -243,7 +242,6 @@ public class KTableKTableForeignKeyJoinScenarioTest {
final Properties config = new Properties();
final String safeTestName = safeUniqueTestName(getClass(), testName);
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
- config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index f448e31..d66f6be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -472,7 +472,6 @@ public class KTableTransformValuesTest {
public static Properties props() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
index fde1af1..87b86cc 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
@@ -19,20 +19,19 @@
package org.apache.kafka.streams.scala.utils
import java.time.Instant
-import java.util.{Properties, UUID}
+import java.util.Properties
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{StreamsConfig, TestInputTopic, TestOutputTopic, TopologyTestDriver}
+import org.apache.kafka.test.TestUtils
import org.scalatest.Suite
trait TestDriver { this: Suite =>
def createTestDriver(builder: StreamsBuilder, initialWallClockTime: Instant = Instant.now()): TopologyTestDriver = {
val config = new Properties()
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
- config.put(StreamsConfig.STATE_DIR_CONFIG, s"out/state-store-${UUID.randomUUID()}")
+ config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath)
new TopologyTestDriver(builder.build(), config, initialWallClockTime)
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 1b17f67..3078fc9 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -297,7 +297,15 @@ public class TopologyTestDriver implements Closeable {
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
- final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+ final Properties configCopy = new Properties();
+ configCopy.putAll(config);
+ if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
+ }
+ if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+ configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-topology-test-driver-app-id");
+ }
+ final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
logIfTaskIdleEnabled(streamsConfig);
logContext = new LogContext("topology-test-driver ");
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 88e1660..ac44b74 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -31,9 +31,9 @@ import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.ClientUtils;
-import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import java.io.File;
@@ -218,7 +218,15 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
- final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+ final Properties configCopy = new Properties();
+ configCopy.putAll(config);
+ if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
+ }
+ if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+ configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-mock-app-id");
+ }
+ final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index e96bdcd..5a537e1 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -240,7 +240,15 @@ public class MockProcessorContext<KForward, VForward> implements ProcessorContex
* @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
*/
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
- final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+ final Properties configCopy = new Properties();
+ configCopy.putAll(config);
+ if (!configCopy.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ configCopy.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-bootstrap-host:0");
+ }
+ if (!configCopy.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+ configCopy.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-mock-app-id");
+ }
+ final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(configCopy);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 85d0a8b..514b983 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -267,7 +267,6 @@ public class MockProcessorContextTest {
public void shouldCaptureApplicationAndRecordMetadata() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
@Override
@@ -389,7 +388,6 @@ public class MockProcessorContextTest {
public void fullConstructorShouldSetAllExpectedAttributes() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index 3f1b8a4..463bd3f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -44,9 +44,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@@ -68,10 +65,6 @@ public class TestTopicsTest {
private final Serde<String> stringSerde = new Serdes.StringSerde();
private final Serde<Long> longSerde = new Serdes.LongSerde();
- private final Properties config = mkProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "TestTopicsTest"),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
- ));
private final Instant testBaseTime = Instant.parse("2019-06-01T10:00:00Z");
@Before
@@ -82,7 +75,7 @@ public class TestTopicsTest {
final KStream<Long, String> source = builder.stream(INPUT_TOPIC_MAP, Consumed.with(longSerde, stringSerde));
final KStream<String, Long> mapped = source.map((key, value) -> new KeyValue<>(value, key));
mapped.to(OUTPUT_TOPIC_MAP, Produced.with(stringSerde, longSerde));
- testDriver = new TopologyTestDriver(builder.build(), config);
+ testDriver = new TopologyTestDriver(builder.build(), new Properties());
}
@After
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 8cf38c9..4c4858c 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -124,11 +124,7 @@ public class TopologyTestDriverTest {
consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
private TopologyTestDriver testDriver;
- private final Properties config = mkProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
- ));
+ private final Properties config;
private KeyValueStore<String, Long> store;
private final StringDeserializer stringDeserializer = new StringDeserializer();
@@ -145,7 +141,16 @@ public class TopologyTestDriverTest {
public TopologyTestDriverTest(final boolean eosEnabled) {
if (eosEnabled) {
- config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ config = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
+ ));
+ } else {
+ config = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
+ ));
}
}
@@ -434,6 +439,11 @@ public class TopologyTestDriverTest {
}
@Test
+ public void shouldNotRequireParameters() {
+ new TopologyTestDriver(setupSingleProcessorTopology(), new Properties());
+ }
+
+ @Test
public void shouldInitProcessor() {
testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
assertTrue(mockProcessors.get(0).initialized);
@@ -1527,7 +1537,6 @@ public class TopologyTestDriverTest {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver-cleanup");
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
@@ -1685,10 +1694,6 @@ public class TopologyTestDriverTest {
@Test
public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
- final Properties properties = new Properties();
- properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
- properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
final Topology topology = new Topology();
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
topology.addProcessor(
@@ -1715,7 +1720,7 @@ public class TopologyTestDriverTest {
topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
- try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
@@ -1737,10 +1742,6 @@ public class TopologyTestDriverTest {
@Test
public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
- final Properties properties = new Properties();
- properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
- properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
final Topology topology = new Topology();
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
topology.addGlobalStore(
@@ -1791,7 +1792,7 @@ public class TopologyTestDriverTest {
topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
topology.addSink("globalSink", "global-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
- try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("global-topic", new StringDeserializer(), new StringDeserializer());
@@ -1817,9 +1818,6 @@ public class TopologyTestDriverTest {
@Test
public void shouldRespectTaskIdling() {
final Properties properties = new Properties();
- properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
- properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
// This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver,
// so with an idle time specified, TTD can't just expect all enqueued records to be processable.
properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000");
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
index a61ead4..608cfea 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.test.wordcount;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
@@ -89,15 +88,11 @@ public class WindowedWordCountProcessorTest {
@Test
public void shouldWorkWithPersistentStore() throws IOException {
- final Properties properties = new Properties();
- properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "");
- properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
-
final File stateDir = TestUtils.tempDirectory();
try {
final MockProcessorContext<String, String> context = new MockProcessorContext<>(
- properties,
+ new Properties(),
new TaskId(0, 0),
stateDir
);