You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 16:21:25 UTC
[kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new
public TopologyTestDriver [cleanup] (#4939)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f641fe KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [cleanup] (#4939)
6f641fe is described below
commit 6f641fef6a88036ab4dacb59ab21bc8b21ef9bcf
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Mon May 7 17:21:20 2018 +0100
KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [cleanup] (#4939)
* Add method to create test properties to StreamsTestUtils
* Make TopologyTestDriver protected constructor package-private
* Add comment suggesting the use of TopologyTestDriver to KStreamTestDriver
* Cleanup:
- GlobalKTableJoinsTest
- KGroupedStreamImplTest
- KGroupedTableImplTest
- KStreamBranchTest
- KStreamFilterTest
- KStreamFlatMapTest
- KStreamFlatMapValuesTest
- KStreamForeachTest
- KStreamGlobalKTableJoinTest
- KStreamGlobalKTableLeftJoinTest
- KStreamImplTest
- KStreamKStreamJoinTest
- KStreamKStreamLeftJoinTest
- KStreamGlobalKTableLeftJoinTest
- KStreamKTableJoinTest
- KStreamKTableLeftJoinTest
- KStreamMapTest
- KStreamMapValuesTest
- KStreamPeekTest
- StreamsBuilderTest
- KStreamSelectKeyTest
- KStreamTransformTest
- KStreamTransformValuesTest
- KStreamWindowAggregateTest
- KTableForeachTest
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../apache/kafka/streams/StreamsBuilderTest.java | 87 +--
.../kstream/internals/GlobalKTableJoinsTest.java | 39 +-
.../kstream/internals/KGroupedStreamImplTest.java | 178 +++--
.../kstream/internals/KGroupedTableImplTest.java | 93 ++-
.../kstream/internals/KStreamBranchTest.java | 33 +-
.../kstream/internals/KStreamFilterTest.java | 42 +-
.../kstream/internals/KStreamFlatMapTest.java | 33 +-
.../internals/KStreamFlatMapValuesTest.java | 44 +-
.../kstream/internals/KStreamForeachTest.java | 39 +-
.../internals/KStreamGlobalKTableJoinTest.java | 25 +-
.../internals/KStreamGlobalKTableLeftJoinTest.java | 24 +-
.../streams/kstream/internals/KStreamImplTest.java | 129 ++--
.../kstream/internals/KStreamKStreamJoinTest.java | 792 ++++++++++-----------
.../internals/KStreamKStreamLeftJoinTest.java | 365 +++++-----
.../kstream/internals/KStreamKTableJoinTest.java | 24 +-
.../internals/KStreamKTableLeftJoinTest.java | 22 +-
.../streams/kstream/internals/KStreamMapTest.java | 43 +-
.../kstream/internals/KStreamMapValuesTest.java | 54 +-
.../streams/kstream/internals/KStreamPeekTest.java | 52 +-
.../kstream/internals/KStreamSelectKeyTest.java | 39 +-
.../kstream/internals/KStreamTransformTest.java | 50 +-
.../internals/KStreamTransformValuesTest.java | 52 +-
.../internals/KStreamWindowAggregateTest.java | 254 +++----
.../kstream/internals/KTableForeachTest.java | 43 +-
.../org/apache/kafka/test/KStreamTestDriver.java | 5 +
.../org/apache/kafka/test/StreamsTestUtils.java | 23 +
.../apache/kafka/streams/TopologyTestDriver.java | 2 +-
27 files changed, 1084 insertions(+), 1502 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 15e55d8..7c2bfa6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -35,9 +35,7 @@ import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Arrays;
@@ -58,26 +56,7 @@ import static org.junit.Assert.assertFalse;
public class StreamsBuilderTest {
private final StreamsBuilder builder = new StreamsBuilder();
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-builder-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Test(expected = TopologyException.class)
public void testFrom() {
@@ -183,10 +162,10 @@ public class StreamsBuilderTest {
source.process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+ }
// no exception was thrown
assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
@@ -203,10 +182,10 @@ public class StreamsBuilderTest {
source.process(sourceProcessorSupplier);
through.process(throughProcessorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
+ }
assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
@@ -224,13 +203,13 @@ public class StreamsBuilderTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
- driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
- driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
- driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+ }
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
}
@@ -250,17 +229,17 @@ public class StreamsBuilderTest {
.withValueSerde(Serdes.String()))
.toStream().foreach(action);
- driver = new TopologyTestDriver(builder.build(), props);
-
final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
- driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
- driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
-
- final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
- assertThat(store.get(1L), equalTo("value1"));
- assertThat(store.get(2L), equalTo("value2"));
- assertThat(results.get(1L), equalTo("value1"));
- assertThat(results.get(2L), equalTo("value2"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+ driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+
+ final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+ assertThat(store.get(1L), equalTo("value1"));
+ assertThat(store.get(2L), equalTo("value2"));
+ assertThat(results.get(1L), equalTo("value1"));
+ assertThat(results.get(2L), equalTo("value2"));
+ }
}
@Test
@@ -270,15 +249,15 @@ public class StreamsBuilderTest {
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
- driver = new TopologyTestDriver(builder.build(), props);
-
final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
- driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
- driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
- final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+ driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+ final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
- assertThat(store.get(1L), equalTo("value1"));
- assertThat(store.get(2L), equalTo("value2"));
+ assertThat(store.get(1L), equalTo("value1"));
+ assertThat(store.get(2L), equalTo("value2"));
+ }
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 8c50afe..da8b102 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -28,8 +27,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +48,6 @@ public class GlobalKTableJoinsTest {
private KStream<String, String> stream;
private KeyValueMapper<String, String, String> keyValueMapper;
private ForeachAction<String, String> action;
- private TopologyTestDriver driver;
@Before
@@ -72,14 +69,6 @@ public class GlobalKTableJoinsTest {
};
}
- @After
- public void cleanup() {
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
@Test
public void shouldLeftJoinWithStream() {
stream
@@ -110,21 +99,17 @@ public class GlobalKTableJoinsTest {
private void verifyJoin(final Map<String, String> expected) {
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
-
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "global-ktable-joins-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
- driver = new TopologyTestDriver(builder.build(), props);
-
- // write some data to the global table
- driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
- driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
- //write some data to the stream
- driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
- driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
- driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+ final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ // write some data to the global table
+ driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
+ driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
+ //write some data to the stream
+ driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
+ driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
+ driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+ }
assertEquals(expected, results);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index b9ca30f..e7a9226 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -50,8 +49,7 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -78,28 +76,13 @@ public class KGroupedStreamImplTest {
private KGroupedStream<String, String> groupedStream;
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-stream-impl-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
}
@SuppressWarnings("deprecation")
@@ -224,13 +207,14 @@ public class KGroupedStreamImplTest {
}
private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+ }
assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -298,13 +282,14 @@ public class KGroupedStreamImplTest {
}
private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+ }
assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -341,13 +326,14 @@ public class KGroupedStreamImplTest {
}
private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+ }
assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -554,26 +540,30 @@ public class KGroupedStreamImplTest {
public void shouldCountAndMaterializeResults() {
groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
- final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
+ final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
- assertThat(count.get("1"), equalTo(3L));
- assertThat(count.get("2"), equalTo(1L));
- assertThat(count.get("3"), equalTo(2L));
+ assertThat(count.get("1"), equalTo(3L));
+ assertThat(count.get("2"), equalTo(1L));
+ assertThat(count.get("3"), equalTo(2L));
+ }
}
@Test
public void shouldLogAndMeasureSkipsInAggregate() {
groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- processData();
- LogCaptureAppender.unregister(appender);
-
- final Map<MetricName, ? extends Metric> metrics = driver.metrics();
- assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
- assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ LogCaptureAppender.unregister(appender);
+
+ final Map<MetricName, ? extends Metric> metrics = driver.metrics();
+ assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+ assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+ }
}
@@ -586,13 +576,15 @@ public class KGroupedStreamImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
- final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
+ final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
- assertThat(reduced.get("1"), equalTo("A+C+D"));
- assertThat(reduced.get("2"), equalTo("B"));
- assertThat(reduced.get("3"), equalTo("E+F"));
+ assertThat(reduced.get("1"), equalTo("A+C+D"));
+ assertThat(reduced.get("2"), equalTo("B"));
+ assertThat(reduced.get("3"), equalTo("E+F"));
+ }
}
@Test
@@ -605,13 +597,15 @@ public class KGroupedStreamImplTest {
);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- processData();
- LogCaptureAppender.unregister(appender);
-
- final Map<MetricName, ? extends Metric> metrics = driver.metrics();
- assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
- assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
+ LogCaptureAppender.unregister(appender);
+
+ final Map<MetricName, ? extends Metric> metrics = driver.metrics();
+ assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+ assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
+ }
}
@@ -625,13 +619,15 @@ public class KGroupedStreamImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
- final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
+ final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
- assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
- assertThat(aggregate.get("2"), equalTo("0+B"));
- assertThat(aggregate.get("3"), equalTo("0+E+F"));
+ assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
+ assertThat(aggregate.get("2"), equalTo("0+B"));
+ assertThat(aggregate.get("3"), equalTo("0+E+F"));
+ }
}
@SuppressWarnings("unchecked")
@@ -649,15 +645,16 @@ public class KGroupedStreamImplTest {
}
});
- processData();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(driver);
- assertThat(results.get("1"), equalTo("0+A+C+D"));
- assertThat(results.get("2"), equalTo("0+B"));
- assertThat(results.get("3"), equalTo("0+E+F"));
+ assertThat(results.get("1"), equalTo("0+A+C+D"));
+ assertThat(results.get("2"), equalTo("0+B"));
+ assertThat(results.get("3"), equalTo("0+E+F"));
+ }
}
- private void processData() {
- driver = new TopologyTestDriver(builder.build(), props);
+ private void processData(final TopologyTestDriver driver) {
driver.pipeInput(recordFactory.create(TOPIC, "1", "A"));
driver.pipeInput(recordFactory.create(TOPIC, "2", "B"));
driver.pipeInput(recordFactory.create(TOPIC, "1", "C"));
@@ -668,22 +665,23 @@ public class KGroupedStreamImplTest {
}
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
- driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+ }
assertThat(results, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
- KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
+ KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
)));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 742f349..05d339f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
@@ -39,8 +38,7 @@ import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -60,29 +58,13 @@ public class KGroupedTableImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
private static final String INVALID_STORE_NAME = "~foo bar~";
private KGroupedTable<String, String> groupedTable;
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
private final String topic = "input";
@Before
public void before() {
groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
-
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-table-impl-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
}
@Test
@@ -140,30 +122,31 @@ public class KGroupedTableImplTest {
groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier<KeyValueStore>) null);
}
- private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
- final Map<String, Integer> results = new HashMap<>();
- final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
- reduced.foreach(new ForeachAction<String, Integer>() {
+ private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
+ final Map<String, Integer> reducedResults = new HashMap<>();
+ inputKTable.foreach(new ForeachAction<String, Integer>() {
@Override
public void apply(final String key, final Integer value) {
- results.put(key, value);
+ reducedResults.put(key, value);
}
});
-
- driver = new TopologyTestDriver(builder.build(), props);
+ return reducedResults;
+ }
+ private void assertReduced(final Map<String, Integer> reducedResults, final String topic, final TopologyTestDriver driver) {
+ final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
- assertEquals(Integer.valueOf(1), results.get("A"));
- assertEquals(Integer.valueOf(2), results.get("B"));
+ assertEquals(Integer.valueOf(1), reducedResults.get("A"));
+ assertEquals(Integer.valueOf(2), reducedResults.get("B"));
driver.pipeInput(recordFactory.create(topic, "A", 2.6, 10));
driver.pipeInput(recordFactory.create(topic, "B", 1.3, 10));
driver.pipeInput(recordFactory.create(topic, "A", 5.7, 10));
driver.pipeInput(recordFactory.create(topic, "B", 6.2, 10));
- assertEquals(Integer.valueOf(5), results.get("A"));
- assertEquals(Integer.valueOf(6), results.get("B"));
+ assertEquals(Integer.valueOf(5), reducedResults.get("A"));
+ assertEquals(Integer.valueOf(6), reducedResults.get("B"));
}
@Test
@@ -184,8 +167,11 @@ public class KGroupedTableImplTest {
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
- doShouldReduce(reduced, topic);
- assertEquals(reduced.queryableStoreName(), "reduced");
+ final Map<String, Integer> results = getReducedResults(reduced);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ assertReduced(results, topic, driver);
+ assertEquals(reduced.queryableStoreName(), "reduced");
+ }
}
@Test
@@ -206,8 +192,11 @@ public class KGroupedTableImplTest {
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
- doShouldReduce(reduced, topic);
- assertNull(reduced.queryableStoreName());
+ final Map<String, Integer> results = getReducedResults(reduced);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ assertReduced(results, topic, driver);
+ assertNull(reduced.queryableStoreName());
+ }
}
@SuppressWarnings("unchecked")
@@ -229,10 +218,13 @@ public class KGroupedTableImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer()));
- doShouldReduce(reduced, topic);
- final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
- assertThat(reduce.get("A"), equalTo(5));
- assertThat(reduce.get("B"), equalTo(6));
+ final Map<String, Integer> results = getReducedResults(reduced);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ assertReduced(results, topic, driver);
+ final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
+ assertThat(reduce.get("A"), equalTo(5));
+ assertThat(reduce.get("B"), equalTo(6));
+ }
}
@SuppressWarnings("unchecked")
@@ -246,10 +238,12 @@ public class KGroupedTableImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
- processData(topic);
- final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
- assertThat(counts.get("1"), equalTo(3L));
- assertThat(counts.get("2"), equalTo(2L));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(topic, driver);
+ final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
+ assertThat(counts.get("1"), equalTo(3L));
+ assertThat(counts.get("2"), equalTo(2L));
+ }
}
@SuppressWarnings("unchecked")
@@ -266,10 +260,12 @@ public class KGroupedTableImplTest {
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String()));
- processData(topic);
- final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
- assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
- assertThat(aggregate.get("2"), equalTo("0+2+2"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ processData(topic, driver);
+ final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
+ assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
+ assertThat(aggregate.get("2"), equalTo("0+2+2"));
+ }
}
@SuppressWarnings("unchecked")
@@ -327,8 +323,7 @@ public class KGroupedTableImplTest {
(Materialized) null);
}
- private void processData(final String topic) {
- driver = new TopologyTestDriver(builder.build(), props);
+ private void processData(final String topic, final TopologyTestDriver driver) {
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
driver.pipeInput(recordFactory.create(topic, "A", "1"));
driver.pipeInput(recordFactory.create(topic, "B", "1"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index bd3d60b..2aa8aac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -21,16 +21,13 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.List;
@@ -42,26 +39,7 @@ public class KStreamBranchTest {
private final String topicName = "topic";
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void before() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-branch-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@SuppressWarnings("unchecked")
@Test
@@ -102,9 +80,10 @@ public class KStreamBranchTest {
branches[i].process(supplier);
}
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ }
}
final List<MockProcessor<Integer, String>> processors = supplier.capturedProcessors(3);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index d338fe3..51a994b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -21,15 +21,12 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Properties;
@@ -40,28 +37,9 @@ public class KStreamFilterTest {
private final String topicName = "topic";
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void before() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-filter-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
- private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
+ private final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
@Override
public boolean test(Integer key, String value) {
return (key % 3) == 0;
@@ -80,9 +58,10 @@ public class KStreamFilterTest {
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filter(isMultipleOfThree).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ }
}
assertEquals(2, supplier.theCapturedProcessor().processed.size());
@@ -100,9 +79,10 @@ public class KStreamFilterTest {
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filterNot(isMultipleOfThree).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ }
}
assertEquals(5, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 9ce24b5..3173dcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -22,15 +22,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.ArrayList;
@@ -42,26 +39,7 @@ public class KStreamFlatMapTest {
private String topicName = "topic";
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void before() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void testFlatMap() {
@@ -88,9 +66,10 @@ public class KStreamFlatMapTest {
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.flatMap(mapper).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ }
}
assertEquals(6, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 221b02b..471b127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -20,16 +20,13 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.ArrayList;
@@ -41,26 +38,7 @@ public class KStreamFlatMapValuesTest {
private String topicName = "topic";
private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void before() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-values-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void testFlatMapValues() {
@@ -83,10 +61,11 @@ public class KStreamFlatMapValuesTest {
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
stream.flatMapValues(mapper).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (final int expectedKey : expectedKeys) {
- // passing the timestamp to recordFactory.create to disambiguate the call
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (final int expectedKey : expectedKeys) {
+ // passing the timestamp to recordFactory.create to disambiguate the call
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+ }
}
String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
@@ -117,10 +96,11 @@ public class KStreamFlatMapValuesTest {
stream.flatMapValues(mapper).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (final int expectedKey : expectedKeys) {
- // passing the timestamp to recordFactory.create to disambiguate the call
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (final int expectedKey : expectedKeys) {
+ // passing the timestamp to recordFactory.create to disambiguate the call
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
+ }
}
String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index b975c96..83a20a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -17,20 +17,16 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.ArrayList;
@@ -45,29 +41,7 @@ public class KStreamForeachTest {
private final String topicName = "topic";
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private Properties props = new Properties();
-
- @Before
- public void before() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-foreach-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
- private final Serde<Integer> intSerde = Serdes.Integer();
- private final Serde<String> stringSerde = Serdes.String();
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void testForeach() {
@@ -97,13 +71,14 @@ public class KStreamForeachTest {
// When
StreamsBuilder builder = new StreamsBuilder();
- KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+ KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.foreach(action);
// Then
- driver = new TopologyTestDriver(builder.build(), props);
- for (KeyValue<Integer, String> record: inputRecords) {
- driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (KeyValue<Integer, String> record : inputRecords) {
+ driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+ }
}
assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 6e5b816..c37e8a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -17,22 +17,20 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -47,8 +45,6 @@ public class KStreamGlobalKTableJoinTest {
private final String streamTopic = "streamTopic";
private final String globalTableTopic = "globalTableTopic";
- private final Serde<Integer> intSerde = Serdes.Integer();
- private final Serde<String> stringSerde = Serdes.String();
private TopologyTestDriver driver;
private MockProcessor<Integer, String> processor;
private final int[] expectedKeys = {0, 1, 2, 3};
@@ -63,8 +59,8 @@ public class KStreamGlobalKTableJoinTest {
final KeyValueMapper<Integer, String, String> keyMapper;
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
- final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+ final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
+ final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
table = builder.globalTable(globalTableTopic, tableConsumed);
keyMapper = new KeyValueMapper<Integer, String, String>() {
@@ -78,13 +74,7 @@ public class KStreamGlobalKTableJoinTest {
};
stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+ final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
driver = new TopologyTestDriver(builder.build(), props);
processor = supplier.theCapturedProcessor();
@@ -92,10 +82,7 @@ public class KStreamGlobalKTableJoinTest {
@After
public void cleanup() {
- if (driver != null) {
- driver.close();
- }
- driver = null;
+ driver.close();
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index b3551ba..eb0775a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -17,13 +17,11 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
@@ -32,7 +30,8 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -46,8 +45,6 @@ public class KStreamGlobalKTableLeftJoinTest {
final private String streamTopic = "streamTopic";
final private String globalTableTopic = "globalTableTopic";
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
private MockProcessor<Integer, String> processor;
private TopologyTestDriver driver;
@@ -64,8 +61,8 @@ public class KStreamGlobalKTableLeftJoinTest {
final KeyValueMapper<Integer, String, String> keyMapper;
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
- final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
+ final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
+ final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
table = builder.globalTable(globalTableTopic, tableConsumed);
keyMapper = new KeyValueMapper<Integer, String, String>() {
@@ -79,18 +76,17 @@ public class KStreamGlobalKTableLeftJoinTest {
};
stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+ final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
driver = new TopologyTestDriver(builder.build(), props);
processor = supplier.theCapturedProcessor();
}
+ @After
+ public void cleanup() {
+ driver.close();
+ }
+
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 797575d..49e8aaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
@@ -24,7 +23,6 @@ import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -49,8 +47,7 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -69,48 +66,28 @@ import static org.junit.Assert.fail;
public class KStreamImplTest {
- private final Serde<String> stringSerde = Serdes.String();
- private final Serde<Integer> intSerde = Serdes.Integer();
- private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
-
private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
private KStream<String, String> testStream;
private StreamsBuilder builder;
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Before
public void before() {
builder = new StreamsBuilder();
testStream = builder.stream("source");
-
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-impl-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
}
@Test
public void testNumProcesses() {
final StreamsBuilder builder = new StreamsBuilder();
- KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
+ KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
- KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
+ KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
KStream<String, String> stream1 =
source1.filter(new Predicate<String, String>() {
@@ -170,7 +147,7 @@ public class KStreamImplTest {
);
final int anyWindowSize = 1;
- final Joined<String, Integer, Integer> joined = Joined.with(stringSerde, intSerde, intSerde);
+ final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
@@ -205,9 +182,8 @@ public class KStreamImplTest {
@Test
public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
final StreamsBuilder builder = new StreamsBuilder();
- final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
- KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
- KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
+ KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
+ KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
stream1.to("topic-5");
stream2.through("topic-6");
@@ -224,11 +200,12 @@ public class KStreamImplTest {
public void shouldSendDataThroughTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
- final KStream<String, String> stream = builder.stream(input, consumed);
- stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
+ final KStream<String, String> stream = builder.stream(input, stringConsumed);
+ stream.through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create(input, "a", "b"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(input, "a", "b"));
+ }
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b")));
}
@@ -236,12 +213,13 @@ public class KStreamImplTest {
public void shouldSendDataToTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
- final KStream<String, String> stream = builder.stream(input, consumed);
- stream.to("to-topic", Produced.with(stringSerde, stringSerde));
- builder.stream("to-topic", consumed).process(processorSupplier);
+ final KStream<String, String> stream = builder.stream(input, stringConsumed);
+ stream.to("to-topic", Produced.with(Serdes.String(), Serdes.String()));
+ builder.stream("to-topic", stringConsumed).process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create(input, "e", "f"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(input, "e", "f"));
+ }
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f")));
}
@@ -249,7 +227,7 @@ public class KStreamImplTest {
// TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
final KStreamBuilder builder = new KStreamBuilder();
- KStream<String, String> kStream = builder.stream(stringSerde, stringSerde, "topic-1");
+ KStream<String, String> kStream = builder.stream(Serdes.String(), Serdes.String(), "topic-1");
ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
final KStream<String, String> stream = kStream
@@ -282,9 +260,8 @@ public class KStreamImplTest {
@Test
public void testToWithNullValueSerdeDoesntNPE() {
final StreamsBuilder builder = new StreamsBuilder();
- final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
- final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), consumed);
- inputStream.to(stringSerde, null, "output");
+ final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), stringConsumed);
+ inputStream.to(Serdes.String(), null, "output");
}
@Test(expected = NullPointerException.class)
@@ -477,7 +454,7 @@ public class KStreamImplTest {
@Test
public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
- final KTable<String, String> table = builder.table("blah", consumed);
+ final KTable<String, String> table = builder.table("blah", stringConsumed);
try {
testStream.leftJoin(table,
MockValueJoiner.TOSTRING_JOINER,
@@ -490,7 +467,7 @@ public class KStreamImplTest {
@Test
public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
- final KTable<String, String> table = builder.table("blah", consumed);
+ final KTable<String, String> table = builder.table("blah", stringConsumed);
try {
testStream.join(table,
MockValueJoiner.TOSTRING_JOINER,
@@ -522,12 +499,12 @@ public class KStreamImplTest {
merged.process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
- driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
- driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
- driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
+ }
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
}
@@ -547,16 +524,16 @@ public class KStreamImplTest {
merged.process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
- driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
- driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
- driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
- driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
- driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
- driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
- driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
+ driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
+ driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
+ driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
+ driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+ }
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
processorSupplier.theCapturedProcessor().processed);
@@ -568,13 +545,13 @@ public class KStreamImplTest {
pattern2Source.process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
- driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
- driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
- driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
- driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+ driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+ driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
+ driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
+ driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+ }
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
processorSupplier.theCapturedProcessor().processed);
@@ -591,13 +568,13 @@ public class KStreamImplTest {
merged.process(processorSupplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
- driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
- driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
- driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
- driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+ driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+ driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
+ driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
+ driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+ }
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
processorSupplier.theCapturedProcessor().processed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 5d849ee..de3446c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -17,26 +17,22 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Arrays;
@@ -55,38 +51,16 @@ public class KStreamKStreamJoinTest {
final private String topic1 = "topic1";
final private String topic2 = "topic2";
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
-
- private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setUp() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-join-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Test
public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
- final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+ final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
left.join(
@@ -98,17 +72,18 @@ public class KStreamKStreamJoinTest {
}
},
JoinWindows.of(100),
- Joined.with(stringSerde, intSerde, intSerde)
+ Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver = new TopologyTestDriver(builder.build(), props);
- driver.pipeInput(recordFactory.create("left", "A", null));
- LogCaptureAppender.unregister(appender);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create("left", "A", null));
+ LogCaptureAppender.unregister(appender);
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
- assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ }
}
@Test
@@ -127,7 +102,7 @@ public class KStreamKStreamJoinTest {
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
- Joined.with(intSerde, stringSerde, stringSerde));
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -135,81 +110,82 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
- // push two items to the primary stream. the other window is empty
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // w2 = {}
+ // push two items to the primary stream. the other window is empty
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // w2 = {}
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ }
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- // push all four items to the primary stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
+ // push all four items to the primary stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
- }
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ }
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- // push all items to the other stream. this should produce six items.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ // push all items to the other stream. this should produce six items.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
- }
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- // push all four items to the primary stream. this should produce six items.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
- }
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
- // push two items to the other stream. this should produce six item.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+ // push two items to the other stream. this should produce six item.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+ }
- processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ }
}
@Test
@@ -229,88 +205,89 @@ public class KStreamKStreamJoinTest {
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
- Joined.with(intSerde, stringSerde, stringSerde));
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props, 0L);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
- // push two items to the primary stream. the other window is empty.this should produce two items
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // w2 = {}
+ // push two items to the primary stream. the other window is empty.this should produce two items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // w2 = {}
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ }
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ }
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- // push all four items to the primary stream. this should produce four items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
+ // push all four items to the primary stream. this should produce four items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
- }
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ }
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
- // push all items to the other stream. this should produce six items.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ // push all items to the other stream. this should produce six items.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
- }
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- // push all four items to the primary stream. this should produce six items.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
- }
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
- // push two items to the other stream. this should produce six item.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+ // push two items to the other stream. this should produce six item.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
+ }
- processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ }
}
@Test
@@ -332,7 +309,7 @@ public class KStreamKStreamJoinTest {
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
- Joined.with(intSerde, stringSerde, stringSerde));
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -340,197 +317,198 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props, time);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
- // push two items to the primary stream. the other window is empty. this should produce no items.
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // w2 = {}
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
- }
+ // push two items to the primary stream. the other window is empty. this should produce no items.
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+ }
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
- }
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+ }
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
- // clear logically
- time = 1000L;
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
- }
- processor.checkAndClearProcessResult();
+ // clear logically
+ time = 1000L;
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+ }
+ processor.checkAndClearProcessResult();
- // gradually expires items in w1
- // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
+ // gradually expires items in w1
+ // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
- time += 100L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 100L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("3:X3+YY3");
+ processor.checkAndClearProcessResult("3:X3+YY3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- // go back to the time before expiration
+ // go back to the time before expiration
- time = 1000L - 100L - 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time = 1000L - 100L - 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0");
+ processor.checkAndClearProcessResult("0:X0+YY0");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- time += 1;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- time += 1;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ time += 1;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- // clear (logically)
- time = 2000L;
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
- }
+ // clear (logically)
+ time = 2000L;
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- // gradually expires items in w2
- // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+ // gradually expires items in w2
+ // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
- time = 2000L + 100L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time = 2000L + 100L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("3:XX3+Y3");
+ processor.checkAndClearProcessResult("3:XX3+Y3");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- // go back to the time before expiration
+ // go back to the time before expiration
- time = 2000L - 100L - 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time = 2000L - 100L - 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult();
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0");
+ processor.checkAndClearProcessResult("0:XX0+Y0");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ }
}
@Test
@@ -552,9 +530,9 @@ public class KStreamKStreamJoinTest {
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(0).after(100),
- Joined.with(intSerde,
- stringSerde,
- stringSerde));
+ Joined.with(Serdes.Integer(),
+ Serdes.String(),
+ Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -562,85 +540,85 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props, time);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
- }
- processor.checkAndClearProcessResult();
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+ }
+ processor.checkAndClearProcessResult();
+ time = 1000L - 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time = 1000L - 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult();
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0");
- processor.checkAndClearProcessResult("0:X0+YY0");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time = 1000 + 100L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time = 1000 + 100L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("3:X3+YY3");
- processor.checkAndClearProcessResult("3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ processor.checkAndClearProcessResult();
}
-
- processor.checkAndClearProcessResult();
}
@Test
@@ -663,7 +641,7 @@ public class KStreamKStreamJoinTest {
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(0).before(100),
- Joined.with(intSerde, stringSerde, stringSerde));
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -671,84 +649,84 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props, time);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
- }
- processor.checkAndClearProcessResult();
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
+ }
+ processor.checkAndClearProcessResult();
+ time = 1000L - 100L - 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time = 1000L - 100L - 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult();
- processor.checkAndClearProcessResult();
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0");
- processor.checkAndClearProcessResult("0:X0+YY0");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time = 1000L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time = 1000L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
- }
+ processor.checkAndClearProcessResult("3:X3+YY3");
- processor.checkAndClearProcessResult("3:X3+YY3");
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ }
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
+ processor.checkAndClearProcessResult();
}
-
- processor.checkAndClearProcessResult();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index c67e13d..11c5c5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -17,24 +17,20 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Arrays;
@@ -50,30 +46,9 @@ public class KStreamKStreamLeftJoinTest {
final private String topic1 = "topic1";
final private String topic2 = "topic2";
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
- private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private Properties props = new Properties();
-
- @Before
- public void setUp() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-left-join-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Test
public void testLeftJoin() {
@@ -91,7 +66,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
- Joined.with(intSerde, stringSerde, stringSerde));
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -99,65 +74,66 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props, 0L);
-
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
- // push two items to the primary stream. the other window is empty
- // w1 {}
- // w2 {}
- // --> w1 = { 0:X0, 1:X1 }
- // --> w2 = {}
-
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
- }
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
- // push two items to the other stream. this should produce two items.
- // w1 = { 0:X0, 1:X1 }
- // w2 {}
- // --> w1 = { 0:X0, 1:X1 }
- // --> w2 = { 0:Y0, 1:Y1 }
-
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
- }
-
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-
- // push three items to the primary stream. this should produce four items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
- // --> w2 = { 0:Y0, 1:Y1 }
-
- for (int i = 0; i < 3; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+ // push two items to the primary stream. the other window is empty
+ // w1 {}
+ // w2 {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // --> w2 = {}
+
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ }
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+ // push two items to the other stream. this should produce two items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // --> w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ }
+
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+ // push three items to the primary stream. this should produce four items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+ // --> w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < 3; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
+
+ // push all items to the other stream. this should produce 5 items
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+ // w2 = { 0:Y0, 1:Y1 }
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+ // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ }
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
+
+ // push all four items to the primary stream. this should produce six items.
+ // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+ // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+ // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+ // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
}
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
-
- // push all items to the other stream. this should produce 5 items
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
- // w2 = { 0:Y0, 1:Y1 }
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
- // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
- }
- processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
-
- // push all four items to the primary stream. this should produce six items.
- // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
- // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
- // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
- // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
-
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
- }
- processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
}
@Test
@@ -176,7 +152,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(100),
- Joined.with(intSerde, stringSerde, stringSerde));
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -184,111 +160,112 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver = new TopologyTestDriver(builder.build(), props, time);
-
- final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
-
- // push two items to the primary stream. the other window is empty. this should produce two items
- // w1 = {}
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // --> w2 = {}
-
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
- }
- processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-
- // push two items to the other stream. this should produce no items.
- // w1 = { 0:X0, 1:X1 }
- // w2 = {}
- // --> w1 = { 0:X0, 1:X1 }
- // --> w2 = { 0:Y0, 1:Y1 }
-
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
- }
- processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-
- // clear logically
- time = 1000L;
-
- // push all items to the other stream. this should produce no items.
- // w1 = {}
- // w2 = {}
- // --> w1 = {}
- // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
- for (int i = 0; i < expectedKeys.length; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
- }
- processor.checkAndClearProcessResult();
-
- // gradually expire items in window 2.
- // w1 = {}
- // w2 = {}
- // --> w1 = {}
- // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
-
- time = 1000L + 100L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
- // go back to the time before expiration
-
- time = 1000L - 100L - 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
- }
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
-
- time += 1L;
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, time)) {
+
+ final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+ // push two items to the primary stream. the other window is empty. this should produce two items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // --> w2 = {}
+
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
+ }
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
+
+ // push two items to the other stream. this should produce no items.
+ // w1 = { 0:X0, 1:X1 }
+ // w2 = {}
+ // --> w1 = { 0:X0, 1:X1 }
+ // --> w2 = { 0:Y0, 1:Y1 }
+
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
+ }
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+ // clear logically
+ time = 1000L;
+
+ // push all items to the other stream. this should produce no items.
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = {}
+ // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+ for (int i = 0; i < expectedKeys.length; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
+ }
+ processor.checkAndClearProcessResult();
+
+ // gradually expire items in window 2.
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = {}
+ // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+
+ time = 1000L + 100L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+ // go back to the time before expiration
+
+ time = 1000L - 100L - 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
+ }
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
}
- processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index ec31b5a..0ce27ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -17,22 +17,21 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -52,8 +51,6 @@ public class KStreamKTableJoinTest {
private final String streamTopic = "streamTopic";
private final String tableTopic = "tableTopic";
- private final Serde<Integer> intSerde = Serdes.Integer();
- private final Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
private final int[] expectedKeys = {0, 1, 2, 3};
@@ -70,23 +67,22 @@ public class KStreamKTableJoinTest {
final KTable<Integer, String> table;
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
stream = builder.stream(streamTopic, consumed);
table = builder.table(tableTopic, consumed);
stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+ final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
driver = new TopologyTestDriver(builder.build(), props, 0L);
processor = supplier.theCapturedProcessor();
}
+ @After
+ public void cleanup() {
+ driver.close();
+ }
+
private void pushToStream(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 735f71c..eedda07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -17,13 +17,11 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -31,7 +29,8 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -48,8 +47,6 @@ public class KStreamKTableLeftJoinTest {
final private String streamTopic = "streamTopic";
final private String tableTopic = "tableTopic";
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
private TopologyTestDriver driver;
private MockProcessor<Integer, String> processor;
@@ -66,23 +63,22 @@ public class KStreamKTableLeftJoinTest {
final KTable<Integer, String> table;
final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
stream = builder.stream(streamTopic, consumed);
table = builder.table(tableTopic, consumed);
stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
+ final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
driver = new TopologyTestDriver(builder.build(), props, 0L);
processor = supplier.theCapturedProcessor();
}
+ @After
+ public void cleanup() {
+ driver.close();
+ }
+
private void pushToStream(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index b0a383b..b55d8e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -17,21 +17,17 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Properties;
@@ -41,30 +37,8 @@ import static org.junit.Assert.assertEquals;
public class KStreamMapTest {
private String topicName = "topic";
-
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void testMap() {
@@ -80,15 +54,14 @@ public class KStreamMapTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
- MockProcessorSupplier<String, Integer> supplier;
-
- supplier = new MockProcessorSupplier<>();
+ MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+ KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.map(mapper).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ }
}
assertEquals(4, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index ed11038..95593aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -17,21 +17,17 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Properties;
@@ -41,33 +37,9 @@ import static org.junit.Assert.assertArrayEquals;
public class KStreamMapValuesTest {
private String topicName = "topic";
-
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
- final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
-
-
+ private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-values-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void testFlatMapValues() {
@@ -83,13 +55,13 @@ public class KStreamMapValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
- KStream<Integer, String> stream;
- stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+ KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.mapValues(mapper).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+ }
}
String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
@@ -110,13 +82,13 @@ public class KStreamMapValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
- KStream<Integer, String> stream;
- stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
+ KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.mapValues(mapper).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+ }
}
String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 2c6ff81..137aa6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -17,20 +17,16 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.ArrayList;
@@ -43,53 +39,33 @@ import static org.junit.Assert.fail;
public class KStreamPeekTest {
private final String topicName = "topic";
- private final Serde<Integer> intSerd = Serdes.Integer();
- private final Serde<String> stringSerd = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-peek-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void shouldObserveStreamElements() {
final StreamsBuilder builder = new StreamsBuilder();
- final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
+ final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
- driver = new TopologyTestDriver(builder.build(), props);
- final List<KeyValue<Integer, String>> expected = new ArrayList<>();
- for (int key = 0; key < 32; key++) {
- final String value = "V" + key;
- driver.pipeInput(recordFactory.create(topicName, key, value));
- expected.add(new KeyValue<>(key, value));
- }
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final List<KeyValue<Integer, String>> expected = new ArrayList<>();
+ for (int key = 0; key < 32; key++) {
+ final String value = "V" + key;
+ driver.pipeInput(recordFactory.create(topicName, key, value));
+ expected.add(new KeyValue<>(key, value));
+ }
- assertEquals(expected, peekObserved);
- assertEquals(expected, streamObserved);
+ assertEquals(expected, peekObserved);
+ assertEquals(expected, streamObserved);
+ }
}
@Test
public void shouldNotAllowNullAction() {
final StreamsBuilder builder = new StreamsBuilder();
- final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
+ final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
try {
stream.peek(null);
fail("expected null action to throw NPE");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 1abc0b9..b030233 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -17,21 +17,17 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.HashMap;
@@ -44,29 +40,8 @@ public class KStreamSelectKeyTest {
private String topicName = "topic_key_select";
- final private Serde<Integer> integerSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-select-key-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
@Test
public void testSelectKey() {
@@ -88,16 +63,16 @@ public class KStreamSelectKeyTest {
final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
final int[] expectedValues = new int[]{1, 2, 3};
- KStream<String, Integer> stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
+ KStream<String, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.String(), Serdes.Integer()));
MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
stream.selectKey(selector).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- for (int expectedValue : expectedValues) {
- driver.pipeInput(recordFactory.create(expectedValue));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (int expectedValue : expectedValues) {
+ driver.pipeInput(recordFactory.create(expectedValue));
+ }
}
assertEquals(3, supplier.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 1567fe1..8a05aac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -17,12 +17,10 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
@@ -33,9 +31,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Rule;
import org.junit.Test;
@@ -47,33 +43,12 @@ public class KStreamTransformTest {
private String topicName = "topic";
- final private Serde<Integer> intSerde = Serdes.Integer();
-
private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
@Rule
public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
@Test
public void testTransform() {
StreamsBuilder builder = new StreamsBuilder();
@@ -102,7 +77,7 @@ public class KStreamTransformTest {
final int[] expectedKeys = {1, 10, 100, 1000};
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
- KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+ KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(transformerSupplier).process(processor);
kstreamDriver.setUp(builder);
@@ -161,18 +136,19 @@ public class KStreamTransformTest {
final int[] expectedKeys = {1, 10, 100, 1000};
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
- KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+ KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(transformerSupplier).process(processor);
- driver = new TopologyTestDriver(builder.build(), props, 0L);
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
- }
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ }
- // This tick will yield yields the "-1:2" result
- driver.advanceWallClockTime(2);
- // This tick further advances the clock to 3, which leads to the "-1:3" result
- driver.advanceWallClockTime(1);
+ // This tick will yield yields the "-1:2" result
+ driver.advanceWallClockTime(2);
+ // This tick further advances the clock to 3, which leads to the "-1:3" result
+ driver.advanceWallClockTime(1);
+ }
assertEquals(6, processor.theCapturedProcessor().processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 6bfc813..419e6f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -17,11 +17,9 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
@@ -34,9 +32,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Properties;
@@ -47,31 +43,9 @@ import static org.junit.Assert.fail;
public class KStreamTransformValuesTest {
private String topicName = "topic";
-
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
-
+ private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
@Test
public void testTransform() {
@@ -109,13 +83,13 @@ public class KStreamTransformValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
KStream<Integer, Integer> stream;
- stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+ stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transformValues(valueTransformerSupplier).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ }
}
String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
@@ -152,13 +126,13 @@ public class KStreamTransformValuesTest {
final int[] expectedKeys = {1, 10, 100, 1000};
KStream<Integer, Integer> stream;
- stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+ stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transformValues(valueTransformerSupplier).process(supplier);
- driver = new TopologyTestDriver(builder.build(), props);
-
- for (int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ }
}
String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 9050edb..7a2a8e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -16,14 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -39,9 +37,7 @@ import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.List;
@@ -54,28 +50,8 @@ import static org.junit.Assert.assertEquals;
public class KStreamWindowAggregateTest {
- final private Serde<String> strSerde = Serdes.String();
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-window-aggregate-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Test
public void testAggBasic() {
@@ -83,31 +59,31 @@ public class KStreamWindowAggregateTest {
final String topic1 = "topic1";
final KTable<Windowed<String>, String> table2 = builder
- .stream(topic1, Consumed.with(strSerde, strSerde))
- .groupByKey(Serialized.with(strSerde, strSerde))
- .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
+ .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic1-Canonized");
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table2.toStream().process(supplier);
- driver = new TopologyTestDriver(builder.build(), props, 0L);
-
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
- driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
- driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
-
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
- driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
- driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
- driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
- driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+ }
assertEquals(
@@ -141,16 +117,16 @@ public class KStreamWindowAggregateTest {
final String topic2 = "topic2";
final KTable<Windowed<String>, String> table1 = builder
- .stream(topic1, Consumed.with(strSerde, strSerde))
- .groupByKey(Serialized.with(strSerde, strSerde))
- .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
+ .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic1-Canonized");
final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
table1.toStream().process(supplier);
final KTable<Windowed<String>, String> table2 = builder
- .stream(topic2, Consumed.with(strSerde, strSerde)).groupByKey(Serialized.with(strSerde, strSerde))
- .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic2-Canonized");
+ .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), Serdes.String(), "topic2-Canonized");
table2.toStream().process(supplier);
@@ -162,84 +138,84 @@ public class KStreamWindowAggregateTest {
}
}).toStream().process(supplier);
- driver = new TopologyTestDriver(builder.build(), props, 0L);
-
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
- driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
- driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
-
- final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
-
- processors.get(0).checkAndClearProcessResult(
- "[A@0/10]:0+1",
- "[B@0/10]:0+2",
- "[C@0/10]:0+3",
- "[D@0/10]:0+4",
- "[A@0/10]:0+1+1"
- );
- processors.get(1).checkAndClearProcessResult();
- processors.get(2).checkAndClearProcessResult();
-
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
- driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
- driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
- driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
-
- processors.get(0).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
- "[B@0/10]:0+2+2", "[B@5/15]:0+2",
- "[D@0/10]:0+4+4", "[D@5/15]:0+4",
- "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
- "[C@0/10]:0+3+3", "[C@5/15]:0+3"
- );
- processors.get(1).checkAndClearProcessResult();
- processors.get(2).checkAndClearProcessResult();
-
- driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
- driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
- driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
- driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
- driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
-
- processors.get(0).checkAndClearProcessResult();
- processors.get(1).checkAndClearProcessResult(
- "[A@0/10]:0+a",
- "[B@0/10]:0+b",
- "[C@0/10]:0+c",
- "[D@0/10]:0+d",
- "[A@0/10]:0+a+a"
- );
- processors.get(2).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1%0+a",
- "[B@0/10]:0+2+2+2%0+b",
- "[C@0/10]:0+3+3%0+c",
- "[D@0/10]:0+4+4%0+d",
- "[A@0/10]:0+1+1+1%0+a+a");
-
- driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
- driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
- driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
- driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
- driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
-
- processors.get(0).checkAndClearProcessResult();
- processors.get(1).checkAndClearProcessResult(
- "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
- "[B@0/10]:0+b+b", "[B@5/15]:0+b",
- "[D@0/10]:0+d+d", "[D@5/15]:0+d",
- "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
- "[C@0/10]:0+c+c", "[C@5/15]:0+c"
- );
- processors.get(2).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
- "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
- "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
- "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
- "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
- );
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+ final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+ processors.get(0).checkAndClearProcessResult(
+ "[A@0/10]:0+1",
+ "[B@0/10]:0+2",
+ "[C@0/10]:0+3",
+ "[D@0/10]:0+4",
+ "[A@0/10]:0+1+1"
+ );
+ processors.get(1).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+
+ processors.get(0).checkAndClearProcessResult(
+ "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+ "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+ "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+ "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+ "[C@0/10]:0+3+3", "[C@5/15]:0+3"
+ );
+ processors.get(1).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
+
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
+ driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
+ driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
+
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(
+ "[A@0/10]:0+a",
+ "[B@0/10]:0+b",
+ "[C@0/10]:0+c",
+ "[D@0/10]:0+d",
+ "[A@0/10]:0+a+a"
+ );
+ processors.get(2).checkAndClearProcessResult(
+ "[A@0/10]:0+1+1+1%0+a",
+ "[B@0/10]:0+2+2+2%0+b",
+ "[C@0/10]:0+3+3%0+c",
+ "[D@0/10]:0+4+4%0+d",
+ "[A@0/10]:0+1+1+1%0+a+a");
+
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
+ driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
+ driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
+
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(
+ "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+ "[B@0/10]:0+b+b", "[B@5/15]:0+b",
+ "[D@0/10]:0+d+d", "[D@5/15]:0+d",
+ "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+ "[C@0/10]:0+c+c", "[C@5/15]:0+c"
+ );
+ processors.get(2).checkAndClearProcessResult(
+ "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+ "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
+ "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
+ "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+ "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
+ );
+ }
}
@Test
@@ -247,22 +223,22 @@ public class KStreamWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
- final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(strSerde, strSerde));
- stream1.groupByKey(Serialized.with(strSerde, strSerde))
+ final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+ stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(10).advanceBy(5))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.<String, String>toStringInstance("+"),
- Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String())
);
- driver = new TopologyTestDriver(builder.build(), props, 0L);
-
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.pipeInput(recordFactory.create(topic, null, "1"));
- LogCaptureAppender.unregister(appender);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ driver.pipeInput(recordFactory.create(topic, null, "1"));
+ LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 30c0a7a..a6b6c64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -17,23 +17,19 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.ArrayList;
@@ -48,29 +44,8 @@ import static org.junit.Assert.assertEquals;
public class KTableForeachTest {
final private String topicName = "topic";
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
- private TopologyTestDriver driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-foreach-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
@Test
public void testForeach() {
@@ -101,17 +76,17 @@ public class KTableForeachTest {
// When
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, String> table = builder.table(topicName,
- Consumed.with(intSerde, stringSerde),
+ Consumed.with(Serdes.Integer(), Serdes.String()),
Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
- .withKeySerde(intSerde)
- .withValueSerde(stringSerde));
+ .withKeySerde(Serdes.Integer())
+ .withValueSerde(Serdes.String()));
table.foreach(action);
// Then
- driver = new TopologyTestDriver(builder.build(), props);
-
- for (KeyValue<Integer, String> record: inputRecords) {
- driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ for (KeyValue<Integer, String> record : inputRecords) {
+ driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
+ }
}
assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index cf4460d..bcb9856 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -44,6 +44,11 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+/**
+ * KStreamTestDriver
+ *
+ * @deprecated please use {@link org.apache.kafka.streams.TopologyTestDriver} instead
+ */
@Deprecated
public class KStreamTestDriver extends ExternalResource {
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index a19b55c..9406519 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -58,6 +59,28 @@ public final class StreamsTestUtils {
}
+ public static Properties topologyTestConfig(final String applicationId,
+ final String bootstrapServers,
+ final String keyDeserializer,
+ final String valueDeserializer) {
+ final Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keyDeserializer);
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueDeserializer);
+ return props;
+ }
+
+ public static Properties topologyTestConfig(final Serde keyDeserializer,
+ final Serde valueDeserializer) {
+ return topologyTestConfig(
+ UUID.randomUUID().toString(),
+ "localhost:9091",
+ keyDeserializer.getClass().getName(),
+ valueDeserializer.getClass().getName());
+ }
+
public static Properties minimalStreamsConfig() {
final Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f6bbc4b..7b8bdd5 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -226,7 +226,7 @@ public class TopologyTestDriver implements Closeable {
* @param builder builder for the topology to be tested
* @param config the configuration for the topology
*/
- protected TopologyTestDriver(final InternalTopologyBuilder builder,
+ TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config) {
this(builder, config, System.currentTimeMillis());
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.