You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/26 18:30:46 UTC
[kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new
public TopologyTestDriver [partial] (#4832)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 885abbf KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [partial] (#4832)
885abbf is described below
commit 885abbfcd40aab57acec278d976956f07be15090
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Thu Apr 26 19:30:42 2018 +0100
KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [partial] (#4832)
* Remove ProcessorTopologyTestDriver from TopologyTest
* Fix ProcessorTopologyTest
* Remove ProcessorTopologyTestDriver and InternalTopologyAccessor
* Partially refactored StreamsBuilderTest but missing one test
* Refactor KStreamBuilderTest
* Refactor AbstractStreamTest
* Further cleanup of AbstractStreamTest
* Refactor GlobalKTableJoinsTest
* Refactor InternalStreamsBuilderTest
* Fix circular dependency in build.gradle
* Refactor KGroupedStreamImplTest
* Partial modifications to KGroupedTableImplTest
* Refactor KGroupedTableImplTest
* Refactor KStreamBranchTest
* Refactor KStreamFilterTest
* Refactor KStreamFlatMapTest KStreamFlatMapValuesTest
* Refactor KStreamForeachTest
* Refactor KStreamGlobalKTableJoinTest
* Refactor KStreamGlobalKTableLeftJoinTest
* Refactor KStreamImplTest
* Refactor KStreamImplTest
* Refactor KStreamKStreamJoinTest
* Refactor KStreamKStreamLeftJoinTest
* Refactor KStreamKTableJoinTest
* Refactor KStreamKTableLeftJoinTest
* Refactor KStreamMapTest and KStreamMapValuesTest
* Refactor KStreamPeekTest and KStreamTransformTest
* Refactor KStreamSelectKeyTest
* Refactor KStreamTransformValuesTest
* Refactor KStreamWindowAggregateTest
* Add Depercation anotation to KStreamTestDriver and rollback failing tests in StreamsBuilderTest and KTableAggregateTest
* Refactor KTableFilterTest
* Refactor KTableForeachTest
* Add getter for ProcessorTopology, and simplify tests in StreamsBuilderTest
* Refactor KTableImplTest
* Remove unused imports
* Refactor KTableAggregateTest
* Fix style errors
* Fix gradle build
* Address reviewer comments:
- Remove properties new instance
- Remove extraneous line
- Remove unnecessary TopologyTestDriver instances from StreamsBuilderTest
- Move props.clear() to @After
- Clarify use of timestamp in KStreamFlatMapValuesTest
- Keep test using old Punctuator in KStreamTransformTest
- Add comment to clarify clock advances in KStreamTransformTest
- Add TopologyTestDriverWrapper class to access the protected constructor of TopologyTestDriver
- Revert KTableImplTest.testRepartition to KStreamTestDriver to avoid exposing the TopologyTestDriver processor topology
- Revert partially migrated classes: KTableAggregateTest, KTableFilterTest, and KTableImplTest
* Rebase on current trunk an fix conflicts
Reviewers: Matthias J Sax <ma...@confluentio>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../apache/kafka/streams/StreamsBuilderTest.java | 113 ++---
.../org/apache/kafka/streams/TopologyTest.java | 4 +-
...ccessor.java => TopologyTestDriverWrapper.java} | 17 +-
.../kafka/streams/kstream/KStreamBuilderTest.java | 53 ++-
.../kstream/internals/AbstractStreamTest.java | 23 +-
.../kstream/internals/GlobalKTableJoinsTest.java | 45 +-
.../internals/InternalStreamsBuilderTest.java | 38 +-
.../kstream/internals/KGroupedStreamImplTest.java | 143 +++---
.../kstream/internals/KGroupedTableImplTest.java | 67 +--
.../kstream/internals/KStreamBranchTest.java | 39 +-
.../kstream/internals/KStreamFilterTest.java | 46 +-
.../kstream/internals/KStreamFlatMapTest.java | 37 +-
.../internals/KStreamFlatMapValuesTest.java | 42 +-
.../kstream/internals/KStreamForeachTest.java | 44 +-
.../internals/KStreamGlobalKTableJoinTest.java | 51 ++-
.../internals/KStreamGlobalKTableLeftJoinTest.java | 34 +-
.../streams/kstream/internals/KStreamImplTest.java | 93 ++--
.../kstream/internals/KStreamKStreamJoinTest.java | 354 ++++++++-------
.../internals/KStreamKStreamLeftJoinTest.java | 122 +++--
.../kstream/internals/KStreamKTableJoinTest.java | 52 ++-
.../internals/KStreamKTableLeftJoinTest.java | 35 +-
.../streams/kstream/internals/KStreamMapTest.java | 39 +-
.../kstream/internals/KStreamMapValuesTest.java | 43 +-
.../streams/kstream/internals/KStreamPeekTest.java | 39 +-
.../kstream/internals/KStreamSelectKeyTest.java | 38 +-
.../kstream/internals/KStreamTransformTest.java | 107 ++++-
.../internals/KStreamTransformValuesTest.java | 44 +-
.../internals/KStreamWindowAggregateTest.java | 200 +++------
.../kstream/internals/KTableForeachTest.java | 39 +-
.../streams/processor/TopologyBuilderTest.java | 17 +-
.../internals/InternalTopologyBuilderTest.java | 4 +-
.../processor/internals/ProcessorTopologyTest.java | 115 ++---
.../org/apache/kafka/test/KStreamTestDriver.java | 1 +
.../kafka/test/ProcessorTopologyTestDriver.java | 490 ---------------------
.../apache/kafka/streams/TopologyTestDriver.java | 28 +-
35 files changed, 1290 insertions(+), 1366 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4a496b8..d3e01fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.streams;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
@@ -28,13 +30,14 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
-import org.junit.Rule;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
@@ -43,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -54,9 +58,26 @@ import static org.junit.Assert.assertFalse;
public class StreamsBuilderTest {
private final StreamsBuilder builder = new StreamsBuilder();
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-builder-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test(expected = TopologyException.class)
public void testFrom() {
@@ -70,9 +91,7 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate());
builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(1));
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -85,9 +104,7 @@ public class StreamsBuilderTest {
.filter(MockPredicate.<Bytes, String>allGoodPredicate(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(2));
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
@@ -99,9 +116,7 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper());
builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(1));
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -114,9 +129,7 @@ public class StreamsBuilderTest {
.mapValues(MockMapper.<String>noOpValueMapper(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(2));
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store")));
@@ -129,14 +142,11 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> table2 = builder.table("table-topic2");
builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(2));
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name())));
assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(), is(true));
-
}
@Test
@@ -145,9 +155,7 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> table2 = builder.table("table-topic2");
builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")), MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(3));
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store")));
@@ -159,9 +167,7 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> table = builder.table("table-topic");
builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
- driver.setUp(builder, TestUtils.tempDirectory());
-
- ProcessorTopology topology = builder.internalTopologyBuilder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.build();
assertThat(topology.stateStores().size(), equalTo(1));
assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name())));
@@ -177,10 +183,10 @@ public class StreamsBuilderTest {
source.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process("topic-source", "A", "aa");
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
// no exception was thrown
assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
@@ -197,10 +203,10 @@ public class StreamsBuilderTest {
source.process(sourceProcessorSupplier);
through.process(throughProcessorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process("topic-source", "A", "aa");
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
@@ -218,13 +224,13 @@ public class StreamsBuilderTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process(topic1, "A", "aa");
- driver.process(topic2, "B", "bb");
- driver.process(topic2, "C", "cc");
- driver.process(topic1, "D", "dd");
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
@@ -244,12 +250,13 @@ public class StreamsBuilderTest {
.withValueSerde(Serdes.String()))
.toStream().foreach(action);
- driver.setUp(builder, TestUtils.tempDirectory());
- driver.setTime(0L);
- driver.process(topic, 1L, "value1");
- driver.process(topic, 2L, "value2");
- driver.flushState();
- final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>) driver.allStateStores().get("store");
+ driver = new TopologyTestDriver(builder.build(), props);
+
+ final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+ driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+
+ final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
assertThat(store.get(1L), equalTo("value1"));
assertThat(store.get(2L), equalTo("value2"));
assertThat(results.get(1L), equalTo("value1"));
@@ -262,12 +269,14 @@ public class StreamsBuilderTest {
builder.globalTable(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
- driver.setUp(builder, TestUtils.tempDirectory());
- driver.setTime(0L);
- driver.process(topic, 1L, "value1");
- driver.process(topic, 2L, "value2");
- driver.flushState();
- final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>) driver.allStateStores().get("store");
+
+ driver = new TopologyTestDriver(builder.build(), props);
+
+ final ConsumerRecordFactory<Long, String> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create(topic, 1L, "value1"));
+ driver.pipeInput(recordFactory.create(topic, 2L, "value2"));
+ final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
+
assertThat(store.get(1L), equalTo("value1"));
assertThat(store.get(2L), equalTo("value2"));
}
@@ -295,12 +304,12 @@ public class StreamsBuilderTest {
}
@Test(expected = TopologyException.class)
- public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+ public void shouldThrowExceptionWhenNoTopicPresent() {
builder.stream(Collections.<String>emptyList());
}
@Test(expected = NullPointerException.class)
- public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+ public void shouldThrowExceptionWhenTopicNamesAreNull() {
builder.stream(Arrays.<String>asList(null, null));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 6834091..eee3386 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStore;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -261,7 +260,6 @@ public class TopologyTest {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- final StreamsConfig streamsConfig = new StreamsConfig(config);
mockStoreBuilder();
EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStore("store", false));
EasyMock.replay(storeBuilder);
@@ -274,7 +272,7 @@ public class TopologyTest {
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
try {
- new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+ new TopologyTestDriver(topology, config);
fail("Should have thrown StreamsException");
} catch (final StreamsException e) {
final String error = e.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
similarity index 58%
rename from streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
rename to streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
index c3c4504..fa976a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import java.util.Properties;
/**
- * This class is meant for testing purposes only and allows the testing of
- * topologies by using the {@link org.apache.kafka.test.ProcessorTopologyTestDriver}
+ * This class allows the instantiation of a {@link TopologyTestDriver} using a
+ * {@link InternalTopologyBuilder} by exposing a protected constructor.
+ *
+ * It should be used only for testing, and should be removed once the deprecated
+ * classes {@link org.apache.kafka.streams.kstream.KStreamBuilder} and
+ * {@link org.apache.kafka.streams.processor.TopologyBuilder} are removed.
*/
-public class InternalTopologyAccessor {
+public class TopologyTestDriverWrapper extends TopologyTestDriver {
- public static InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
- return topology.internalTopologyBuilder;
+ public TopologyTestDriverWrapper(final InternalTopologyBuilder builder,
+ final Properties config) {
+ super(builder, config);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index f9949d3..81bdb31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -18,7 +18,10 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
@@ -26,19 +29,21 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -55,12 +60,27 @@ public class KStreamBuilderTest {
private static final String APP_ID = "app-id";
private final KStreamBuilder builder = new KStreamBuilder();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private TopologyTestDriverWrapper driver;
+ private final Properties props = new Properties();
@Before
- public void setUp() {
+ public void setup() {
builder.setApplicationId(APP_ID);
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-builder-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test(expected = TopologyBuilderException.class)
@@ -93,10 +113,8 @@ public class KStreamBuilderTest {
source.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
-
- driver.process("topic-source", "A", "aa");
+ driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
// no exception was thrown
assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
@@ -113,10 +131,8 @@ public class KStreamBuilderTest {
source.process(sourceProcessorSupplier);
through.process(throughProcessorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
-
- driver.process("topic-source", "A", "aa");
+ driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
@@ -147,13 +163,12 @@ public class KStreamBuilderTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
- driver.process(topic1, "A", "aa");
- driver.process(topic2, "B", "bb");
- driver.process(topic2, "C", "cc");
- driver.process(topic1, "D", "dd");
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index dcfb9ba..2aa07f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -16,20 +16,25 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
import org.junit.Test;
+import java.util.Properties;
import java.util.Random;
import static org.easymock.EasyMock.createMock;
@@ -40,10 +45,6 @@ import static org.junit.Assert.assertTrue;
public class AbstractStreamTest {
- private final String topicName = "topic";
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
-
@Test
public void testToInternlValueTransformerSupplierSuppliesNewTransformers() {
final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class);
@@ -82,9 +83,15 @@ public class AbstractStreamTest {
stream.randomFilter().process(processor);
- driver.setUp(builder);
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+
+ final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, "V" + expectedKey);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
assertTrue(processor.processed.size() <= expectedKeys.length);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index eebb7d2..8c50afe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -17,22 +17,25 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -43,17 +46,15 @@ public class GlobalKTableJoinsTest {
private final Map<String, String> results = new HashMap<>();
private final String streamTopic = "stream";
private final String globalTopic = "global";
- private File stateDir;
private GlobalKTable<String, String> global;
private KStream<String, String> stream;
private KeyValueMapper<String, String, String> keyValueMapper;
private ForeachAction<String, String> action;
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private TopologyTestDriver driver;
+
@Before
public void setUp() {
- stateDir = TestUtils.tempDirectory();
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
global = builder.globalTable(globalTopic, consumed);
stream = builder.stream(streamTopic, consumed);
@@ -71,6 +72,14 @@ public class GlobalKTableJoinsTest {
};
}
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
@Test
public void shouldLeftJoinWithStream() {
stream
@@ -100,16 +109,22 @@ public class GlobalKTableJoinsTest {
}
private void verifyJoin(final Map<String, String> expected) {
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "global-ktable-joins-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+
+ driver = new TopologyTestDriver(builder.build(), props);
+
// write some data to the global table
- driver.process(globalTopic, "a", "A");
- driver.process(globalTopic, "b", "B");
+ driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
+ driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
//write some data to the stream
- driver.process(streamTopic, "1", "a");
- driver.process(streamTopic, "2", "b");
- driver.process(streamTopic, "3", "c");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
+ driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
+ driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
assertEquals(expected, results);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index b9ba608..76ca495 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -30,11 +30,9 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -58,8 +56,6 @@ public class InternalStreamsBuilderTest {
private static final String APP_ID = "app-id";
private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-
- private KStreamTestDriver driver = null;
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
private final String storePrefix = "prefix-";
private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
@@ -70,14 +66,6 @@ public class InternalStreamsBuilderTest {
builder.internalTopologyBuilder.setApplicationId(APP_ID);
}
- @After
- public void cleanup() {
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
@Test
public void testNewName() {
assertEquals("X-0000000000", builder.newProcessorName("X-"));
@@ -105,7 +93,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+ public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
final String topic1 = "topic-1";
final String topic2 = "topic-2";
final String topic3 = "topic-3";
@@ -138,7 +126,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() throws Exception {
+ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
KTable table1 = builder.table("topic2",
consumed,
new MaterializedInternal<>(
@@ -158,7 +146,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
+ public void shouldBuildGlobalTableWithNonQueryableStoreName() {
final GlobalKTable<String, String> table1 = builder.globalTable(
"topic2",
consumed,
@@ -171,7 +159,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
+ public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
final GlobalKTable<String, String> table1 = builder.globalTable(
"topic2",
consumed,
@@ -184,7 +172,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldBuildSimpleGlobalTableTopology() throws Exception {
+ public void shouldBuildSimpleGlobalTableTopology() {
builder.globalTable("table",
consumed,
new MaterializedInternal<>(
@@ -199,7 +187,7 @@ public class InternalStreamsBuilderTest {
assertEquals("globalTable", stateStores.get(0).name());
}
- private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+ private void doBuildGlobalTopologyWithAllGlobalTables() {
final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
final List<StateStore> stateStores = topology.globalStateStores();
@@ -210,7 +198,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+ public void shouldBuildGlobalTopologyWithAllGlobalTables() {
builder.globalTable("table",
consumed,
new MaterializedInternal<>(
@@ -224,7 +212,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldAddGlobalTablesToEachGroup() throws Exception {
+ public void shouldAddGlobalTablesToEachGroup() {
final String one = "globalTable";
final String two = "globalTable2";
@@ -269,7 +257,7 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+ public void shouldMapStateStoresToCorrectSourceTopics() {
final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
@@ -367,14 +355,14 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
+ public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
builder.stream(Collections.singleton("topic"), consumed);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
- public void shouldUseProvidedTimestampExtractor() throws Exception {
+ public void shouldUseProvidedTimestampExtractor() {
final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
builder.stream(Collections.singleton("topic"), consumed);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
@@ -382,14 +370,14 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
+ public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
builder.table("topic", consumed, materialized);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
- public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
+ public void ktableShouldUseProvidedTimestampExtractor() {
final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
builder.table("topic", consumed, materialized);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index c74d0dc..b9ca30f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -20,10 +20,13 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
@@ -43,13 +46,13 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
@@ -57,6 +60,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -72,13 +76,30 @@ public class KGroupedStreamImplTest {
private static final String INVALID_STORE_NAME = "~foo bar~";
private final StreamsBuilder builder = new StreamsBuilder();
private KGroupedStream<String, String> groupedStream;
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
@Before
public void before() {
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
+
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-stream-impl-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@SuppressWarnings("deprecation")
@@ -203,20 +224,13 @@ public class KGroupedStreamImplTest {
}
private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
- driver.setUp(builder, TestUtils.tempDirectory());
- driver.setTime(10);
- driver.process(TOPIC, "1", "1");
- driver.setTime(15);
- driver.process(TOPIC, "2", "2");
- driver.setTime(30);
- driver.process(TOPIC, "1", "1");
- driver.setTime(70);
- driver.process(TOPIC, "1", "1");
- driver.setTime(90);
- driver.process(TOPIC, "1", "1");
- driver.setTime(100);
- driver.process(TOPIC, "1", "1");
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -284,20 +298,13 @@ public class KGroupedStreamImplTest {
}
private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
- driver.setUp(builder, TestUtils.tempDirectory());
- driver.setTime(10);
- driver.process(TOPIC, "1", "1");
- driver.setTime(15);
- driver.process(TOPIC, "2", "2");
- driver.setTime(30);
- driver.process(TOPIC, "1", "1");
- driver.setTime(70);
- driver.process(TOPIC, "1", "1");
- driver.setTime(90);
- driver.process(TOPIC, "1", "1");
- driver.setTime(100);
- driver.process(TOPIC, "1", "1");
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -334,20 +341,13 @@ public class KGroupedStreamImplTest {
}
private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
- driver.setUp(builder, TestUtils.tempDirectory());
- driver.setTime(10);
- driver.process(TOPIC, "1", "A");
- driver.setTime(15);
- driver.process(TOPIC, "2", "Z");
- driver.setTime(30);
- driver.process(TOPIC, "1", "B");
- driver.setTime(70);
- driver.process(TOPIC, "1", "A");
- driver.setTime(90);
- driver.process(TOPIC, "1", "B");
- driver.setTime(100);
- driver.process(TOPIC, "1", "C");
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
@@ -556,8 +556,7 @@ public class KGroupedStreamImplTest {
processData();
- @SuppressWarnings("unchecked") final KeyValueStore<String, Long> count =
- (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+ final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
assertThat(count.get("1"), equalTo(3L));
assertThat(count.get("2"), equalTo(1L));
@@ -571,10 +570,10 @@ public class KGroupedStreamImplTest {
processData();
LogCaptureAppender.unregister(appender);
- final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+ final Map<MetricName, ? extends Metric> metrics = driver.metrics();
assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
}
@@ -589,7 +588,7 @@ public class KGroupedStreamImplTest {
processData();
- final KeyValueStore<String, String> reduced = (KeyValueStore<String, String>) driver.allStateStores().get("reduce");
+ final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
assertThat(reduced.get("1"), equalTo("A+C+D"));
assertThat(reduced.get("2"), equalTo("B"));
@@ -609,10 +608,10 @@ public class KGroupedStreamImplTest {
processData();
LogCaptureAppender.unregister(appender);
- final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+ final Map<MetricName, ? extends Metric> metrics = driver.metrics();
assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[0] offset=[6]"));
}
@@ -628,7 +627,7 @@ public class KGroupedStreamImplTest {
processData();
- final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+ final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
assertThat(aggregate.get("2"), equalTo("0+B"));
@@ -658,29 +657,25 @@ public class KGroupedStreamImplTest {
}
private void processData() {
- driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
- driver.setTime(0);
- driver.process(TOPIC, "1", "A");
- driver.process(TOPIC, "2", "B");
- driver.process(TOPIC, "1", "C");
- driver.process(TOPIC, "1", "D");
- driver.process(TOPIC, "3", "E");
- driver.process(TOPIC, "3", "F");
- driver.process(TOPIC, "3", null);
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A"));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B"));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "C"));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "D"));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "E"));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "F"));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", null));
}
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
- driver.setUp(builder, TestUtils.tempDirectory(), 0);
- driver.setTime(0);
- driver.process(TOPIC, "1", "A");
- driver.process(TOPIC, "2", "B");
- driver.process(TOPIC, "3", "C");
- driver.setTime(500);
- driver.process(TOPIC, "1", "A");
- driver.process(TOPIC, "1", "A");
- driver.process(TOPIC, "2", "B");
- driver.process(TOPIC, "2", "B");
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
assertThat(results, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 3bbd7e2..742f349 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -17,11 +17,15 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
@@ -30,18 +34,19 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -55,14 +60,29 @@ public class KGroupedTableImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
private static final String INVALID_STORE_NAME = "~foo bar~";
private KGroupedTable<String, String> groupedTable;
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
private final String topic = "input";
@Before
public void before() {
groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(MockMapper.<String, String>selectValueKeyValueMapper());
+
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kgrouped-table-impl-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test
@@ -122,6 +142,7 @@ public class KGroupedTableImplTest {
private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
final Map<String, Integer> results = new HashMap<>();
+ final ConsumerRecordFactory<String, Double> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
reduced.foreach(new ForeachAction<String, Integer>() {
@Override
public void apply(final String key, final Integer value) {
@@ -129,20 +150,17 @@ public class KGroupedTableImplTest {
}
});
- driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
- driver.setTime(10L);
- driver.process(topic, "A", 1.1);
- driver.process(topic, "B", 2.2);
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
+ driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
assertEquals(Integer.valueOf(1), results.get("A"));
assertEquals(Integer.valueOf(2), results.get("B"));
- driver.process(topic, "A", 2.6);
- driver.process(topic, "B", 1.3);
- driver.process(topic, "A", 5.7);
- driver.process(topic, "B", 6.2);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic, "A", 2.6, 10));
+ driver.pipeInput(recordFactory.create(topic, "B", 1.3, 10));
+ driver.pipeInput(recordFactory.create(topic, "A", 5.7, 10));
+ driver.pipeInput(recordFactory.create(topic, "B", 6.2, 10));
assertEquals(Integer.valueOf(5), results.get("A"));
assertEquals(Integer.valueOf(6), results.get("B"));
@@ -212,7 +230,7 @@ public class KGroupedTableImplTest {
.withValueSerde(Serdes.Integer()));
doShouldReduce(reduced, topic);
- final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.allStateStores().get("reduce");
+ final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.getStateStore("reduce");
assertThat(reduce.get("A"), equalTo(5));
assertThat(reduce.get("B"), equalTo(6));
}
@@ -229,7 +247,7 @@ public class KGroupedTableImplTest {
.withValueSerde(Serdes.Long()));
processData(topic);
- final KeyValueStore<String, Long> counts = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+ final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
assertThat(counts.get("1"), equalTo(3L));
assertThat(counts.get("2"), equalTo(2L));
}
@@ -249,7 +267,7 @@ public class KGroupedTableImplTest {
.withKeySerde(Serdes.String()));
processData(topic);
- final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+ final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.getStateStore("aggregate");
assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
assertThat(aggregate.get("2"), equalTo("0+2+2"));
}
@@ -310,13 +328,12 @@ public class KGroupedTableImplTest {
}
private void processData(final String topic) {
- driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
- driver.setTime(0L);
- driver.process(topic, "A", "1");
- driver.process(topic, "B", "1");
- driver.process(topic, "C", "1");
- driver.process(topic, "D", "2");
- driver.process(topic, "E", "2");
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props);
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ driver.pipeInput(recordFactory.create(topic, "A", "1"));
+ driver.pipeInput(recordFactory.create(topic, "B", "1"));
+ driver.pipeInput(recordFactory.create(topic, "C", "1"));
+ driver.pipeInput(recordFactory.create(topic, "D", "2"));
+ driver.pipeInput(recordFactory.create(topic, "E", "2"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 702631a..a70bc37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -16,26 +16,51 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Array;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class KStreamBranchTest {
- private String topicName = "topic";
+ private final String topicName = "topic";
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void before() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-branch-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@SuppressWarnings("unchecked")
@Test
@@ -78,9 +103,9 @@ public class KStreamBranchTest {
branches[i].process(processors[i]);
}
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, "V" + expectedKey);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
assertEquals(3, processors[0].processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index f1a6152..a67d688 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -16,25 +16,51 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Properties;
+
import static org.junit.Assert.assertEquals;
public class KStreamFilterTest {
- private String topicName = "topic";
-
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final String topicName = "topic";
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void before() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-filter-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
@Override
public boolean test(Integer key, String value) {
@@ -54,9 +80,9 @@ public class KStreamFilterTest {
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filter(isMultipleOfThree).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, "V" + expectedKey);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
assertEquals(2, processor.processed.size());
@@ -74,9 +100,9 @@ public class KStreamFilterTest {
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filterNot(isMultipleOfThree).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, "V" + expectedKey);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
assertEquals(5, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 59dad47..e414218 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -16,27 +16,52 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class KStreamFlatMapTest {
private String topicName = "topic";
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ @Before
+ public void before() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testFlatMap() {
@@ -63,9 +88,9 @@ public class KStreamFlatMapTest {
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.flatMap(mapper).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, "V" + expectedKey);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
assertEquals(6, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index ecfa3aa..14213c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -16,27 +16,51 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Properties;
import static org.junit.Assert.assertArrayEquals;
public class KStreamFlatMapValuesTest {
private String topicName = "topic";
+ private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void before() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-flat-map-values-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testFlatMapValues() {
@@ -59,9 +83,10 @@ public class KStreamFlatMapValuesTest {
final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
stream.flatMapValues(mapper).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (final int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, expectedKey);
+ // passing the timestamp to recordFactory.create to disambiguate the call
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
}
String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
@@ -92,9 +117,10 @@ public class KStreamFlatMapValuesTest {
stream.flatMapValues(mapper).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (final int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, expectedKey);
+ // passing the timestamp to recordFactory.create to disambiguate the call
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey, 0L));
}
String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index e8854fc..b975c96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -16,32 +16,58 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.Rule;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class KStreamForeachTest {
- final private String topicName = "topic";
+ private final String topicName = "topic";
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private Properties props = new Properties();
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ @Before
+ public void before() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-foreach-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
+
+ private final Serde<Integer> intSerde = Serdes.Integer();
+ private final Serde<String> stringSerde = Serdes.String();
@Test
public void testForeach() {
@@ -75,9 +101,9 @@ public class KStreamForeachTest {
stream.foreach(action);
// Then
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (KeyValue<Integer, String> record: inputRecords) {
- driver.process(topicName, record.key, record.value);
+ driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
}
assertEquals(expectedRecords.size(), actualRecords.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index d1d048b..2936f5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -16,46 +16,46 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.Properties;
import java.util.Set;
import static org.junit.Assert.assertEquals;
public class KStreamGlobalKTableJoinTest {
- final private String streamTopic = "streamTopic";
- final private String globalTableTopic = "globalTableTopic";
-
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
+ private final String streamTopic = "streamTopic";
+ private final String globalTableTopic = "globalTableTopic";
+ private final Serde<Integer> intSerde = Serdes.Integer();
+ private final Serde<String> stringSerde = Serdes.String();
+ private TopologyTestDriver driver;
private MockProcessorSupplier<Integer, String> processor;
private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
public void setUp() throws IOException {
- stateDir = TestUtils.tempDirectory("kafka-test");
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
@@ -78,29 +78,46 @@ public class KStreamGlobalKTableJoinTest {
};
stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+ driver = new TopologyTestDriver(builder.build(), props);
+ }
+
+ @After
+ public void cleanup() {
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+ final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
value = value + ",FKey" + expectedKeys[i];
}
- driver.process(streamTopic, expectedKeys[i], value);
+ driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], value));
}
}
private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]));
}
}
private void pushNullValueToGlobalTable(final int messageCount) {
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+ driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 8b7dd42..8882113 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -16,25 +16,28 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.Properties;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -43,19 +46,15 @@ public class KStreamGlobalKTableLeftJoinTest {
final private String streamTopic = "streamTopic";
final private String globalTableTopic = "globalTableTopic";
-
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
+ private TopologyTestDriver driver;
private MockProcessorSupplier<Integer, String> processor;
private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
public void setUp() throws IOException {
- stateDir = TestUtils.tempDirectory("kafka-test");
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
@@ -78,29 +77,38 @@ public class KStreamGlobalKTableLeftJoinTest {
};
stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+ driver = new TopologyTestDriver(builder.build(), props);
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
+ final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
value = value + ",FKey" + expectedKeys[i];
}
- driver.process(streamTopic, expectedKeys[i], value);
+ driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], value));
}
}
private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- driver.process(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]));
}
}
private void pushNullValueToGlobalTable(final int messageCount) {
+ final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- driver.process(globalTableTopic, "FKey" + expectedKeys[i], null);
+ driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], null));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index ef65bb3..f397246 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -42,16 +45,18 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -71,13 +76,29 @@ public class KStreamImplTest {
private StreamsBuilder builder;
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
@Before
public void before() {
builder = new StreamsBuilder();
testStream = builder.stream("source");
+
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-impl-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test
@@ -204,8 +225,8 @@ public class KStreamImplTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
- driver.setUp(builder);
- driver.process(input, "a", "b");
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(input, "a", "b"));
assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b")));
}
@@ -218,8 +239,8 @@ public class KStreamImplTest {
stream.to("to-topic", Produced.with(stringSerde, stringSerde));
builder.stream("to-topic", consumed).process(processorSupplier);
- driver.setUp(builder);
- driver.process(input, "e", "f");
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create(input, "e", "f"));
assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f")));
}
@@ -501,13 +522,12 @@ public class KStreamImplTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process(topic1, "A", "aa");
- driver.process(topic2, "B", "bb");
- driver.process(topic2, "C", "cc");
- driver.process(topic1, "D", "dd");
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
@@ -528,17 +548,16 @@ public class KStreamImplTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process(topic1, "A", "aa");
- driver.process(topic2, "B", "bb");
- driver.process(topic3, "C", "cc");
- driver.process(topic4, "D", "dd");
- driver.process(topic4, "E", "ee");
- driver.process(topic3, "F", "ff");
- driver.process(topic2, "G", "gg");
- driver.process(topic1, "H", "hh");
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
+ driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
+ driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
+ driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
+ driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
+ driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
+ driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
processorSupplier.processed);
@@ -551,14 +570,13 @@ public class KStreamImplTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
pattern2Source.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process("topic-3", "A", "aa");
- driver.process("topic-4", "B", "bb");
- driver.process("topic-5", "C", "cc");
- driver.process("topic-6", "D", "dd");
- driver.process("topic-7", "E", "ee");
+ driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+ driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+ driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
+ driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
+ driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
processorSupplier.processed);
@@ -576,14 +594,13 @@ public class KStreamImplTest {
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
- driver.setUp(builder);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
- driver.process("topic-3", "A", "aa");
- driver.process("topic-4", "B", "bb");
- driver.process("topic-A", "C", "cc");
- driver.process("topic-Z", "D", "dd");
- driver.process(topic3, "E", "ee");
+ driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
+ driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
+ driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
+ driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
+ driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
processorSupplier.processed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 4d2bce5..63a040a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -16,30 +16,32 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -55,14 +57,27 @@ public class KStreamKStreamJoinTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
@Before
public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-join-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test
@@ -71,6 +86,7 @@ public class KStreamKStreamJoinTest {
final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+ final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
left.join(
right,
@@ -85,13 +101,13 @@ public class KStreamKStreamJoinTest {
);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.setUp(builder, stateDir);
- driver.process("left", "A", null);
+ driver = new TopologyTestDriver(builder.build(), props);
+ driver.pipeInput(recordFactory.create("left", "A", null));
LogCaptureAppender.unregister(appender);
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[-1] offset=[-1]"));
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
- assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
}
@Test
@@ -120,8 +136,7 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props);
// push two items to the primary stream. the other window is empty
// w1 = {}
@@ -130,7 +145,7 @@ public class KStreamKStreamJoinTest {
// w2 = {}
for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
}
processor.checkAndClearProcessResult();
@@ -142,7 +157,7 @@ public class KStreamKStreamJoinTest {
// w2 = { 0:Y0, 1:Y1 }
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
}
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -153,8 +168,8 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
// w2 = { 0:Y0, 1:Y1 }
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "X" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
}
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -165,8 +180,8 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
}
processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -177,8 +192,8 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -190,7 +205,7 @@ public class KStreamKStreamJoinTest {
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
}
processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
@@ -222,8 +237,7 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
// push two items to the primary stream. the other window is empty.this should produce two items
// w1 = {}
@@ -232,7 +246,7 @@ public class KStreamKStreamJoinTest {
// w2 = {}
for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
}
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
@@ -244,7 +258,7 @@ public class KStreamKStreamJoinTest {
// w2 = { 0:Y0, 1:Y1 }
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
}
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -255,8 +269,8 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
// w2 = { 0:Y0, 1:Y1 }
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "X" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
}
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
@@ -267,8 +281,8 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
}
processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -279,8 +293,8 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
@@ -292,7 +306,7 @@ public class KStreamKStreamJoinTest {
// w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "YYY" + expectedKeys[i]));
}
processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
@@ -327,17 +341,15 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
-
+ driver = new TopologyTestDriver(builder.build(), props, time);
// push two items to the primary stream. the other window is empty. this should produce no items.
// w1 = {}
// w2 = {}
// --> w1 = { 0:X0, 1:X1 }
// w2 = {}
- setRecordContext(time, topic1);
for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
}
processor.checkAndClearProcessResult();
@@ -348,58 +360,53 @@ public class KStreamKStreamJoinTest {
// --> w1 = { 0:X0, 1:X1 }
// w2 = { 0:Y0, 1:Y1 }
- setRecordContext(time, topic2);
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
}
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// clear logically
time = 1000L;
- setRecordContext(time, topic1);
for (int i = 0; i < expectedKeys.length; i++) {
- setRecordContext(time + i, topic1);
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
}
processor.checkAndClearProcessResult();
// gradually expires items in w1
// w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
- time = 1000 + 100L;
- setRecordContext(time, topic2);
-
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 100L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult();
@@ -407,37 +414,36 @@ public class KStreamKStreamJoinTest {
// go back to the time before expiration
time = 1000L - 100L - 1L;
- setRecordContext(time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult();
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
@@ -445,8 +451,7 @@ public class KStreamKStreamJoinTest {
// clear (logically)
time = 2000L;
for (int i = 0; i < expectedKeys.length; i++) {
- setRecordContext(time + i, topic2);
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
}
processor.checkAndClearProcessResult();
@@ -455,37 +460,36 @@ public class KStreamKStreamJoinTest {
// w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
time = 2000L + 100L;
- setRecordContext(time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("3:XX3+Y3");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult();
@@ -493,37 +497,36 @@ public class KStreamKStreamJoinTest {
// go back to the time before expiration
time = 2000L - 100L - 1L;
- setRecordContext(time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult();
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:XX0+Y0");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
- setRecordContext(++time, topic1);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
@@ -560,85 +563,80 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
+ driver = new TopologyTestDriver(builder.build(), props, time);
for (int i = 0; i < expectedKeys.length; i++) {
- setRecordContext(time + i, topic1);
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
}
processor.checkAndClearProcessResult();
time = 1000L - 1L;
- setRecordContext(time, topic2);
-
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult();
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
time = 1000 + 100L;
- setRecordContext(time, topic2);
-
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult();
@@ -673,90 +671,82 @@ public class KStreamKStreamJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
+ driver = new TopologyTestDriver(builder.build(), props, time);
for (int i = 0; i < expectedKeys.length; i++) {
- setRecordContext(time + i, topic1);
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
}
processor.checkAndClearProcessResult();
time = 1000L - 100L - 1L;
-
- setRecordContext(time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult();
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- time = 1000L;
- setRecordContext(time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time = 1000L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult("3:X3+YY3");
- setRecordContext(++time, topic2);
- for (final int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ time += 1L;
+ for (int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, time));
}
processor.checkAndClearProcessResult();
}
-
- private void setRecordContext(final long time, final String topic) {
- ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 465082b..cb1aaf1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -16,29 +16,30 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -50,15 +51,27 @@ public class KStreamKStreamLeftJoinTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
-
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private Properties props = new Properties();
@Before
- public void setUp() throws IOException {
- stateDir = TestUtils.tempDirectory("kafka-test");
+ public void setUp() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-kstream-left-join-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test
@@ -87,8 +100,7 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
- driver.setTime(0L);
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
// push two items to the primary stream. the other window is empty
// w1 {}
@@ -97,9 +109,8 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = {}
for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce two items.
@@ -109,9 +120,8 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = { 0:Y0, 1:Y1 }
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
@@ -122,9 +132,8 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = { 0:Y0, 1:Y1 }
for (int i = 0; i < 3; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
// push all items to the other stream. this should produce 5 items
@@ -134,9 +143,8 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
for (int expectedKey : expectedKeys) {
- driver.process(topic2, expectedKey, "YY" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
// push all four items to the primary stream. this should produce six items.
@@ -146,9 +154,8 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
}
@@ -178,7 +185,7 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
+ driver = new TopologyTestDriver(builder.build(), props, time);
// push two items to the primary stream. the other window is empty. this should produce two items
// w1 = {}
@@ -186,11 +193,9 @@ public class KStreamKStreamLeftJoinTest {
// --> w1 = { 0:X0, 1:X1 }
// --> w2 = {}
- setRecordContext(time, topic1);
for (int i = 0; i < 2; i++) {
- driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce no items.
@@ -199,16 +204,13 @@ public class KStreamKStreamLeftJoinTest {
// --> w1 = { 0:X0, 1:X1 }
// --> w2 = { 0:Y0, 1:Y1 }
- setRecordContext(time, topic2);
for (int i = 0; i < 2; i++) {
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// clear logically
time = 1000L;
- setRecordContext(time, topic2);
// push all items to the other stream. this should produce no items.
// w1 = {}
@@ -216,10 +218,8 @@ public class KStreamKStreamLeftJoinTest {
// --> w1 = {}
// --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
for (int i = 0; i < expectedKeys.length; i++) {
- setRecordContext(time + i, topic2);
- driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], time + i));
}
- driver.flushState();
processor.checkAndClearProcessResult();
// gradually expire items in window 2.
@@ -229,81 +229,65 @@ public class KStreamKStreamLeftJoinTest {
// --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
time = 1000L + 100L;
- setRecordContext(time, topic1);
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
// go back to the time before expiration
time = 1000L - 100L - 1L;
- setRecordContext(time, topic1);
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
- setRecordContext(++time, topic1);
+ time += 1L;
for (int expectedKey : expectedKeys) {
- driver.process(topic1, expectedKey, "XX" + expectedKey);
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
- driver.flushState();
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
}
-
- private void setRecordContext(final long time, final String topic) {
- ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 1800385..5b2a797 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -16,26 +16,29 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -45,24 +48,21 @@ import static org.junit.Assert.assertEquals;
public class KStreamKTableJoinTest {
- final private String streamTopic = "streamTopic";
- final private String tableTopic = "tableTopic";
+ private final String streamTopic = "streamTopic";
+ private final String tableTopic = "tableTopic";
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final Serde<Integer> intSerde = Serdes.Integer();
+ private final Serde<String> stringSerde = Serdes.String();
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
private MockProcessorSupplier<Integer, String> processor;
private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
public void setUp() {
- final File stateDir = TestUtils.tempDirectory("kafka-test");
-
builder = new StreamsBuilder();
-
final KStream<Integer, String> stream;
final KTable<Integer, String> table;
@@ -72,25 +72,31 @@ public class KStreamKTableJoinTest {
table = builder.table(tableTopic, consumed);
stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
}
private void pushToStream(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
- driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
}
}
private void pushToTable(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
- driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
}
}
private void pushNullValueToTable() {
for (int i = 0; i < 2; i++) {
- driver.process(tableTopic, expectedKeys[i], null);
+ driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
}
}
@@ -188,20 +194,20 @@ public class KStreamKTableJoinTest {
@Test
public void shouldLogAndMeterWhenSkippingNullLeftKey() {
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.process(streamTopic, null, "A");
+ driver.pipeInput(recordFactory.create(streamTopic, null, "A"));
LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[-1] offset=[-1]"));
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[0] offset=[0]"));
}
@Test
public void shouldLogAndMeterWhenSkippingNullLeftValue() {
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.process(streamTopic, 1, null);
+ driver.pipeInput(recordFactory.create(streamTopic, 1, null));
LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[-1] offset=[-1]"));
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[0] offset=[0]"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 7507d7a..669f4c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -16,26 +16,28 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -47,17 +49,14 @@ public class KStreamKTableLeftJoinTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
private MockProcessorSupplier<Integer, String> processor;
private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
- public void setUp() throws IOException {
- stateDir = TestUtils.tempDirectory("kafka-test");
-
+ public void setUp() {
builder = new StreamsBuilder();
@@ -70,25 +69,31 @@ public class KStreamKTableLeftJoinTest {
table = builder.table(tableTopic, consumed);
stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
- driver.setUp(builder, stateDir);
- driver.setTime(0L);
+ final Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
}
private void pushToStream(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
- driver.process(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
}
}
private void pushToTable(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
- driver.process(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]);
+ driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
}
}
private void pushNullValueToTable(final int messageCount) {
for (int i = 0; i < messageCount; i++) {
- driver.process(tableTopic, expectedKeys[i], null);
+ driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], null));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index eed7d7d..bb22204 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -16,18 +16,26 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Properties;
+
import static org.junit.Assert.assertEquals;
public class KStreamMapTest {
@@ -36,8 +44,27 @@ public class KStreamMapTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testMap() {
@@ -59,9 +86,9 @@ public class KStreamMapTest {
processor = new MockProcessorSupplier<>();
stream.map(mapper).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, "V" + expectedKey);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
}
assertEquals(4, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 2cedfb4..17a13e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -16,18 +16,26 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Properties;
+
import static org.junit.Assert.assertArrayEquals;
public class KStreamMapValuesTest {
@@ -36,8 +44,27 @@ public class KStreamMapValuesTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-map-values-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testFlatMapValues() {
@@ -58,9 +85,9 @@ public class KStreamMapValuesTest {
stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
stream.mapValues(mapper).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, Integer.toString(expectedKey));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
}
String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
@@ -86,9 +113,9 @@ public class KStreamMapValuesTest {
stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
stream.mapValues(mapper).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, Integer.toString(expectedKey));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
}
String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 215b0c3..2c6ff81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -16,20 +16,26 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.test.KStreamTestDriver;
-
-import org.junit.Rule;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -39,8 +45,27 @@ public class KStreamPeekTest {
private final String topicName = "topic";
private final Serde<Integer> intSerd = Serdes.Integer();
private final Serde<String> stringSerd = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-peek-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void shouldObserveStreamElements() {
@@ -49,11 +74,11 @@ public class KStreamPeekTest {
final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
final List<KeyValue<Integer, String>> expected = new ArrayList<>();
for (int key = 0; key < 32; key++) {
final String value = "V" + key;
- driver.process(topicName, key, value);
+ driver.pipeInput(recordFactory.create(topicName, key, value));
expected.add(new KeyValue<>(key, value));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index f4f340f..0bf6452 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -16,20 +16,27 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -39,8 +46,27 @@ public class KStreamSelectKeyTest {
final private Serde<Integer> integerSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-select-key-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testSelectKey() {
@@ -68,10 +94,10 @@ public class KStreamSelectKeyTest {
stream.selectKey(selector).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
for (int expectedValue : expectedValues) {
- driver.process(topicName, null, expectedValue);
+ driver.pipeInput(recordFactory.create(expectedValue));
}
assertEquals(3, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index df3ceaf..aa0cf7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -16,20 +16,31 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import java.util.Properties;
+
import static org.junit.Assert.assertEquals;
public class KStreamTransformTest {
@@ -37,8 +48,30 @@ public class KStreamTransformTest {
private String topicName = "topic";
final private Serde<Integer> intSerde = Serdes.Integer();
+ private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
@Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ public final KStreamTestDriver kstreamDriver = new KStreamTestDriver();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testTransform() {
@@ -79,13 +112,77 @@ public class KStreamTransformTest {
KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
stream.transform(transformerSupplier).process(processor);
- driver.setUp(builder);
+ kstreamDriver.setUp(builder);
+ for (int expectedKey : expectedKeys) {
+ kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
+ }
+
+ kstreamDriver.punctuate(2);
+ kstreamDriver.punctuate(3);
+
+ assertEquals(6, processor.processed.size());
+
+ String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], processor.processed.get(i));
+ }
+ }
+
+ @Test
+ public void testTransformWithNewDriverAndPunctuator() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
+ new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+ public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+ return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
+
+ private int total = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ context.forward(-1, (int) timestamp);
+ }
+ });
+ }
+
+ @Override
+ public KeyValue<Integer, Integer> transform(Number key, Number value) {
+ total += value.intValue();
+ return KeyValue.pair(key.intValue() * 2, total);
+ }
+
+ @Override
+ public KeyValue<Integer, Integer> punctuate(long timestamp) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ };
+
+ final int[] expectedKeys = {1, 10, 100, 1000};
+
+ MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
+ KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
+ stream.transform(transformerSupplier).process(processor);
+
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, expectedKey * 10);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
}
- driver.punctuate(2);
- driver.punctuate(3);
+ // This tick will yield yields the "-1:2" result
+ driver.advanceWallClockTime(2);
+ // This tick further advances the clock to 3, which leads to the "-1:3" result
+ driver.advanceWallClockTime(1);
assertEquals(6, processor.processed.size());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index dc0b886..59a6a21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -29,11 +32,15 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Properties;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
@@ -42,8 +49,27 @@ public class KStreamTransformValuesTest {
private String topicName = "topic";
final private Serde<Integer> intSerde = Serdes.Integer();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
+
+ @Before
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
+ }
@Test
public void testTransform() {
@@ -85,9 +111,10 @@ public class KStreamTransformValuesTest {
stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
stream.transformValues(valueTransformerSupplier).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
+
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, expectedKey * 10);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
}
String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
@@ -128,9 +155,10 @@ public class KStreamTransformValuesTest {
stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
stream.transformValues(valueTransformerSupplier).process(processor);
- driver.setUp(builder);
+ driver = new TopologyTestDriver(builder.build(), props);
+
for (int expectedKey : expectedKeys) {
- driver.process(topicName, expectedKey, expectedKey * 10);
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
}
String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 7082251..fc31db9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -18,10 +18,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -29,20 +32,18 @@ import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
+import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
@@ -52,13 +53,26 @@ import static org.junit.Assert.assertEquals;
public class KStreamWindowAggregateTest {
final private Serde<String> strSerde = Serdes.String();
- private File stateDir = null;
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
@Before
- public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-window-aggregate-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test
@@ -74,55 +88,24 @@ public class KStreamWindowAggregateTest {
final MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
- driver.setUp(builder, stateDir);
-
- setRecordContext(0, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(1, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(2, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
- setRecordContext(3, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(4, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
-
- setRecordContext(5, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(6, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(7, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(8, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(9, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
- setRecordContext(10, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(11, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(12, topic1);
- driver.flushState();
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(13, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(14, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
assertEquals(
@@ -149,10 +132,6 @@ public class KStreamWindowAggregateTest {
);
}
- private void setRecordContext(final long time, @SuppressWarnings("SameParameterValue") final String topic) {
- ((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
- }
-
@Test
public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -183,23 +162,13 @@ public class KStreamWindowAggregateTest {
}
}).toStream().process(proc3);
- driver.setUp(builder, stateDir);
-
- setRecordContext(0, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(1, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(2, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
- setRecordContext(3, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(4, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
proc1.checkAndClearProcessResult(
"[A@0/10]:0+1",
@@ -211,21 +180,11 @@ public class KStreamWindowAggregateTest {
proc2.checkAndClearProcessResult();
proc3.checkAndClearProcessResult();
- setRecordContext(5, topic1);
- driver.process(topic1, "A", "1");
- driver.flushState();
- setRecordContext(6, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(7, topic1);
- driver.process(topic1, "D", "4");
- driver.flushState();
- setRecordContext(8, topic1);
- driver.process(topic1, "B", "2");
- driver.flushState();
- setRecordContext(9, topic1);
- driver.process(topic1, "C", "3");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
proc1.checkAndClearProcessResult(
"[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
@@ -237,21 +196,11 @@ public class KStreamWindowAggregateTest {
proc2.checkAndClearProcessResult();
proc3.checkAndClearProcessResult();
- setRecordContext(0, topic1);
- driver.process(topic2, "A", "a");
- driver.flushState();
- setRecordContext(1, topic1);
- driver.process(topic2, "B", "b");
- driver.flushState();
- setRecordContext(2, topic1);
- driver.process(topic2, "C", "c");
- driver.flushState();
- setRecordContext(3, topic1);
- driver.process(topic2, "D", "d");
- driver.flushState();
- setRecordContext(4, topic1);
- driver.process(topic2, "A", "a");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
+ driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
+ driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
proc1.checkAndClearProcessResult();
proc2.checkAndClearProcessResult(
@@ -268,21 +217,12 @@ public class KStreamWindowAggregateTest {
"[D@0/10]:0+4+4%0+d",
"[A@0/10]:0+1+1+1%0+a+a");
- setRecordContext(5, topic1);
- driver.process(topic2, "A", "a");
- driver.flushState();
- setRecordContext(6, topic1);
- driver.process(topic2, "B", "b");
- driver.flushState();
- setRecordContext(7, topic1);
- driver.process(topic2, "D", "d");
- driver.flushState();
- setRecordContext(8, topic1);
- driver.process(topic2, "B", "b");
- driver.flushState();
- setRecordContext(9, topic1);
- driver.process(topic2, "C", "c");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
+ driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
+ driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
+
proc1.checkAndClearProcessResult();
proc2.checkAndClearProcessResult(
"[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
@@ -314,15 +254,13 @@ public class KStreamWindowAggregateTest {
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
);
- driver.setUp(builder, stateDir);
+ driver = new TopologyTestDriver(builder.build(), props, 0L);
- setRecordContext(0, topic);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.process(topic, null, "1");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic, null, "1"));
LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-1] offset=[-1]"));
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index d8b3a5f..30c0a7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -16,27 +16,31 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -44,15 +48,28 @@ import static org.junit.Assert.assertEquals;
public class KTableForeachTest {
final private String topicName = "topic";
- private File stateDir = null;
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+ private TopologyTestDriver driver;
+ private final Properties props = new Properties();
@Before
- public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
+ public void setup() {
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-foreach-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+ props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ }
+
+ @After
+ public void cleanup() {
+ props.clear();
+ if (driver != null) {
+ driver.close();
+ }
+ driver = null;
}
@Test
@@ -91,11 +108,11 @@ public class KTableForeachTest {
table.foreach(action);
// Then
- driver.setUp(builder, stateDir);
+ driver = new TopologyTestDriver(builder.build(), props);
+
for (KeyValue<Integer, String> record: inputRecords) {
- driver.process(topicName, record.key, record.value);
+ driver.pipeInput(recordFactory.create(topicName, record.key, record.value));
}
- driver.flushState();
assertEquals(expectedRecords.size(), actualRecords.size());
for (int i = 0; i < expectedRecords.size(); i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f67b634..f948c2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.processor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
@@ -34,7 +36,6 @@ import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -605,16 +606,12 @@ public class TopologyBuilderTest {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource(sourceNodeName, "topic")
- .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
- sourceNodeName)
- .addStateStore(
- Stores.create(LocalMockProcessorSupplier.STORE_NAME)
- .withStringKeys().withStringValues().inMemory()
- .build(), goodNodeName)
- .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
- sourceNodeName);
+ .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+ .addStateStore(Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), goodNodeName)
+ .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
try {
- final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
+ final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
+ driver.pipeInput(new ConsumerRecord<>("topic", 0, 0L, new byte[] {}, new byte[] {}));
fail("Should have thrown StreamsException");
} catch (final StreamsException e) {
final String error = e.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 901fc4b..25468c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
@@ -34,7 +35,6 @@ import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -581,7 +581,7 @@ public class InternalTopologyBuilderTest {
builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
try {
- new ProcessorTopologyTestDriver(streamsConfig, builder);
+ new TopologyTestDriverWrapper(builder, config);
fail("Should have throw StreamsException");
} catch (final StreamsException e) {
final String error = e.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index d07274a..a80b25d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -36,8 +37,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -66,26 +67,26 @@ public class ProcessorTopologyTest {
private final TopologyBuilder builder = new TopologyBuilder();
private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
- private ProcessorTopologyTestDriver driver;
- private StreamsConfig config;
+ private TopologyTestDriverWrapper driver;
+ private final Properties props = new Properties();
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
final File localState = TestUtils.tempDirectory();
- final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- this.config = new StreamsConfig(props);
}
@After
public void cleanup() {
+ props.clear();
if (driver != null) {
driver.close();
}
@@ -122,19 +123,19 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingSimpleTopology() {
- final int partition = 10;
- driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+ int partition = 10;
+ driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
assertNoOutputRecord(OUTPUT_TOPIC_2);
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition);
assertNoOutputRecord(OUTPUT_TOPIC_2);
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
assertNoOutputRecord(OUTPUT_TOPIC_2);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition);
@@ -144,18 +145,18 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingMultiplexingTopology() {
- driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver = new TopologyTestDriverWrapper(createMultiplexingTopology().internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -166,18 +167,18 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingMultiplexByNameTopology() {
- driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology().internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key4", "value4"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key5", "value5"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -188,12 +189,12 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingStatefulTopology() {
- final String storeName = "entries";
- driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+ String storeName = "entries";
+ driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName).internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value4"));
assertNoOutputRecord(OUTPUT_TOPIC_1);
final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName);
@@ -214,10 +215,10 @@ public class ProcessorTopologyTest {
final TopologyBuilder topologyBuilder = this.builder
.addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
- driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder);
- final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get(storeName);
- driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver = new TopologyTestDriverWrapper(topologyBuilder.internalTopologyBuilder, props);
+ final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
+ driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
+ driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
assertEquals("value1", globalStore.get("key1"));
assertEquals("value2", globalStore.get("key2"));
}
@@ -225,23 +226,23 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingSimpleMultiSourceTopology() {
final int partition = 10;
- driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder);
+ driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition).internalTopologyBuilder, props);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
assertNoOutputRecord(OUTPUT_TOPIC_2);
- driver.process(INPUT_TOPIC_2, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_2, "key2", "value2"));
assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition);
assertNoOutputRecord(OUTPUT_TOPIC_1);
}
@Test
public void testDrivingForwardToSourceTopology() {
- driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver = new TopologyTestDriverWrapper(createForwardToSourceTopology().internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3");
@@ -249,10 +250,10 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingInternalRepartitioningTopology() {
- driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology().internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
@@ -260,10 +261,10 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
- driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000"));
assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER),
equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 1000L, "key1", "value1")));
assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER),
@@ -319,10 +320,10 @@ public class ProcessorTopologyTest {
@Test
public void shouldConsiderTimeStamps() {
final int partition = 10;
- driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+ driver = new TopologyTestDriverWrapper(createSimpleTopology(partition).internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L);
@@ -331,10 +332,10 @@ public class ProcessorTopologyTest {
@Test
public void shouldConsiderModifiedTimeStamps() {
final int partition = 10;
- driver = new ProcessorTopologyTestDriver(config, createTimestampTopology(partition).internalTopologyBuilder);
- driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
- driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
- driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+ driver = new TopologyTestDriverWrapper(createTimestampTopology(partition).internalTopologyBuilder, props);
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 20L);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 30L);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 3daf051..c93a306 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -44,6 +44,7 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+@Deprecated
public class KStreamTestDriver extends ExternalResource {
private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
deleted file mode 100644
index afd2bb2..0000000
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
-import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.StateDirectory;
-import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
-import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class makes it easier to write tests to verify the behavior of topologies created with a {@link Topology}.
- * You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors,
- * and sinks. And because it starts with a {@link Topology}, you can create topologies specific to your tests or you
- * can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real
- * Kafka broker, so the tests execute very quickly with very little overhead.
- * <p>
- * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamsConfig} and a
- * TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any
- * messages output by the topology.
- * <p>
- * Although the driver doesn't use a real Kafka broker, it does simulate Kafka {@link org.apache.kafka.clients.consumer.Consumer}s
- * and {@link org.apache.kafka.clients.producer.Producer}s that read and write raw {@code byte[]} messages. You can either deal
- * with messages that have {@code byte[]} keys and values, or you can supply the {@link Serializer}s and {@link Deserializer}s
- * that the driver can use to convert the keys and values into objects.
- *
- * <h2>Driver setup</h2>
- * <p>
- * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamsConfig}. The
- * configuration needs to be representative of what you'd supply to the real topology, so that means including several key
- * properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
- * (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
- *
- * <pre>
- * StringSerializer strSerializer = new StringSerializer();
- * StringDeserializer strDeserializer = new StringDeserializer();
- * Properties props = new Properties();
- * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * StreamsConfig config = new StreamsConfig(props);
- * TopologyBuilder builder = ...
- * ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
- * </pre>
- *
- * <h2>Processing messages</h2>
- * <p>
- * Your test can supply new input records on any of the topics that the topology's sources consume. Here's an example of an
- * input message on the topic named {@code input-topic}:
- *
- * <pre>
- * driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
- * </pre>
- *
- * Immediately, the driver will pass the input message through to the appropriate source that consumes the named topic,
- * and will invoke the processor(s) downstream of the source. If your topology's processors forward messages to sinks,
- * your test can then consume these output messages to verify they match the expected outcome. For example, if our topology
- * should have generated 2 messages on {@code output-topic-1} and 1 message on {@code output-topic-2}, then our test can
- * obtain these messages using the {@link #readOutput(String, Deserializer, Deserializer)} method:
- *
- * <pre>
- * ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
- * ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
- * ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
- * </pre>
- *
- * Again, our example topology generates messages with string keys and values, so we supply our string deserializer instance
- * for use on both the keys and values. Your test logic can then verify whether these output records are correct.
- * <p>
- * Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
- * {@link org.apache.kafka.streams.processor.Processor}s.
- *
- * <h2>Processor state</h2>
- * <p>
- * Some processors use Kafka {@link StateStore state storage}, so this driver class provides the {@link #getStateStore(String)}
- * and {@link #getKeyValueStore(String)} methods so that your tests can check the underlying state store(s) used by your
- * topology's processors. In our previous example, after we supplied a single input message and checked the three output messages,
- * our test could also check the key value store to verify the processor correctly added, removed, or updated internal state.
- * Or, our test might have pre-populated some state <em>before</em> submitting the input message, and verified afterward that the
- * processor(s) correctly updated the state.
- */
-public class ProcessorTopologyTestDriver {
-
- private final static String APPLICATION_ID = "test-driver-application";
- private final static int PARTITION_ID = 0;
- private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
-
- private final ProcessorTopology topology;
- private final MockProducer<byte[], byte[]> producer;
- private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
- private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
- private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
- private final Set<String> internalTopics = new HashSet<>();
- private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
- private StreamTask task;
- private GlobalStateUpdateTask globalStateTask;
-
- /**
- * Create a new test driver instance.
- *
- * @param config the stream configuration for the topology
- * @param builder the topology builder that will be used to create the topology instance
- */
- public ProcessorTopologyTestDriver(final StreamsConfig config,
- final InternalTopologyBuilder builder) {
- topology = builder.setApplicationId(APPLICATION_ID).build(null);
- final ProcessorTopology globalTopology = builder.buildGlobalStateTopology();
-
- // Set up the consumer and producer ...
- final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
- producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
- @Override
- public List<PartitionInfo> partitionsFor(final String topic) {
- return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
- }
- };
-
- // Identify internal topics for forwarding in process ...
- for (final InternalTopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
- internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
- }
-
- // Set up all of the topic+partition information and subscribe the consumer to each ...
- for (final String topic : topology.sourceTopics()) {
- final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
- partitionsByTopic.put(topic, tp);
- offsetsByTopicPartition.put(tp, new AtomicLong());
- }
-
- consumer.assign(offsetsByTopicPartition.keySet());
-
- final StateDirectory stateDirectory = new StateDirectory(config, Time.SYSTEM);
- final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
- final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics);
-
- if (globalTopology != null) {
- final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
- final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
- for (final String topicName : globalTopology.sourceTopics()) {
- final List<PartitionInfo> partitionInfos = new ArrayList<>();
- partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
- globalConsumer.updatePartitions(topicName, partitionInfos);
- final TopicPartition partition = new TopicPartition(topicName, 1);
- globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
- globalPartitionsByTopic.put(topicName, partition);
- offsetsByTopicPartition.put(partition, new AtomicLong());
- }
- final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(
- new LogContext("mock "),
- globalTopology,
- globalConsumer,
- stateDirectory,
- stateRestoreListener,
- config);
- final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache);
- stateManager.setGlobalProcessorContext(globalProcessorContext);
- globalStateTask = new GlobalStateUpdateTask(
- globalTopology,
- globalProcessorContext,
- stateManager,
- new LogAndContinueExceptionHandler(),
- new LogContext());
- globalStateTask.initialize();
- }
-
- if (!partitionsByTopic.isEmpty()) {
- task = new StreamTask(
- TASK_ID,
- partitionsByTopic.values(),
- topology,
- consumer,
- new StoreChangelogReader(
- createRestoreConsumer(topology.storeToChangelogTopic()),
- new MockStateRestoreListener(),
- new LogContext("topology-test-driver ")
- ),
- config,
- streamsMetrics,
- stateDirectory,
- cache,
- new MockTime(),
- producer
- );
- task.initializeStateStores();
- task.initializeTopology();
- }
- }
-
- /**
- * Send an input message with the given key, value and timestamp on the specified topic to the topology, and then commit the messages.
- *
- * @param topicName the name of the topic on which the message is to be sent
- * @param key the raw message key
- * @param value the raw message value
- * @param timestamp the raw message timestamp
- */
- public void process(final String topicName,
- final byte[] key,
- final byte[] value,
- final long timestamp) {
-
- final TopicPartition tp = partitionsByTopic.get(topicName);
- if (tp != null) {
- // Add the record ...
- final long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
- task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
- producer.clear();
-
- // Process the record ...
- task.process();
- ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(timestamp, offset, tp.partition(), topicName));
- task.commit();
-
- // Capture all the records sent to the producer ...
- for (final ProducerRecord<byte[], byte[]> record : producer.history()) {
- Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
- if (outputRecords == null) {
- outputRecords = new LinkedList<>();
- outputRecordsByTopic.put(record.topic(), outputRecords);
- }
- outputRecords.add(record);
-
- // Forward back into the topology if the produced record is to an internal or a source topic ...
- if (internalTopics.contains(record.topic()) || topology.sourceTopics().contains(record.topic())) {
- process(record.topic(), record.key(), record.value(), record.timestamp());
- }
- }
- } else {
- final TopicPartition global = globalPartitionsByTopic.get(topicName);
- if (global == null) {
- throw new IllegalArgumentException("Unexpected topic: " + topicName);
- }
- final long offset = offsetsByTopicPartition.get(global).incrementAndGet();
- globalStateTask.update(new ConsumerRecord<>(global.topic(), global.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value));
- globalStateTask.flushState();
- }
- }
-
- /**
- * Send an input message with the given key and value on the specified topic to the topology.
- *
- * @param topicName the name of the topic on which the message is to be sent
- * @param key the raw message key
- * @param value the raw message value
- */
- public void process(final String topicName,
- final byte[] key,
- final byte[] value) {
- process(topicName, key, value, 0L);
- }
-
- /**
- * Send an input message with the given key and value on the specified topic to the topology.
- *
- * @param topicName the name of the topic on which the message is to be sent
- * @param key the raw message key
- * @param value the raw message value
- * @param keySerializer the serializer for the key
- * @param valueSerializer the serializer for the value
- */
- public <K, V> void process(final String topicName,
- final K key,
- final V value,
- final Serializer<K> keySerializer,
- final Serializer<V> valueSerializer) {
- process(topicName, key, value, keySerializer, valueSerializer, 0L);
- }
-
- /**
- * Send an input message with the given key and value and timestamp on the specified topic to the topology.
- *
- * @param topicName the name of the topic on which the message is to be sent
- * @param key the raw message key
- * @param value the raw message value
- * @param keySerializer the serializer for the key
- * @param valueSerializer the serializer for the value
- * @param timestamp the raw message timestamp
- */
- public <K, V> void process(final String topicName,
- final K key,
- final V value,
- final Serializer<K> keySerializer,
- final Serializer<V> valueSerializer,
- final long timestamp) {
- process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value), timestamp);
- }
-
- /**
- * Read the next record from the given topic. These records were output by the topology during the previous calls to
- * {@link #process(String, byte[], byte[])}.
- *
- * @param topic the name of the topic
- * @return the next record on that topic, or null if there is no record available
- */
- public ProducerRecord<byte[], byte[]> readOutput(final String topic) {
- final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
- if (outputRecords == null) {
- return null;
- }
- return outputRecords.poll();
- }
-
- /**
- * Read the next record from the given topic. These records were output by the topology during the previous calls to
- * {@link #process(String, byte[], byte[])}.
- *
- * @param topic the name of the topic
- * @param keyDeserializer the deserializer for the key type
- * @param valueDeserializer the deserializer for the value type
- * @return the next record on that topic, or null if there is no record available
- */
- public <K, V> ProducerRecord<K, V> readOutput(final String topic,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer) {
- final ProducerRecord<byte[], byte[]> record = readOutput(topic);
- if (record == null) {
- return null;
- }
- final K key = keyDeserializer.deserialize(record.topic(), record.key());
- final V value = valueDeserializer.deserialize(record.topic(), record.value());
- return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
- }
-
- private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]> record) {
- return Collections.singleton(record);
- }
-
- /**
- * Get the {@link StateStore} with the given name. The name should have been supplied via
- * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is
- * presumed to be used by a Processor within the topology.
- * <p>
- * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
- * {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
- *
- * @param name the name of the store
- * @return the state store, or null if no store has been registered with the given name
- * @see #getKeyValueStore(String)
- */
- public StateStore getStateStore(final String name) {
- return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
- }
-
- /**
- * Get the {@link KeyValueStore} with the given name. The name should have been supplied via
- * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is
- * presumed to be used by a Processor within the topology.
- * <p>
- * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
- * {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
- * <p>
- *
- * @param name the name of the store
- * @return the key value store, or null if no {@link KeyValueStore} has been registered with the given name
- * @see #getStateStore(String)
- */
- @SuppressWarnings("unchecked")
- public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
- final StateStore store = getStateStore(name);
- return store instanceof KeyValueStore ? (KeyValueStore<K, V>) getStateStore(name) : null;
- }
-
- /**
- * Close the driver, its topology, and all processors.
- */
- public void close() {
- if (task != null) {
- task.close(true, false);
- }
- if (globalStateTask != null) {
- try {
- globalStateTask.close();
- } catch (final IOException e) {
- // ignore
- }
- }
- }
-
- /**
- * Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this
- * driver object unless this method is overwritten with a functional consumer.
- *
- * @param storeToChangelogTopic the map of the names of the stores to the changelog topics
- * @return the mock consumer; never null
- */
- private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
- final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
- @Override
- public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
-
- @Override
- public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
-
- @Override
- public synchronized long position(final TopicPartition partition) {
- return 0L;
- }
- };
- // For each store ...
- for (final Map.Entry<String, String> storeAndTopic : storeToChangelogTopic.entrySet()) {
- final String topicName = storeAndTopic.getValue();
- // Set up the restore-state topic ...
- // consumer.subscribe(new TopicPartition(topicName, 1));
- // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
- final List<PartitionInfo> partitionInfos = new ArrayList<>();
- partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
- consumer.updatePartitions(topicName, partitionInfos);
- consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
- }
- return consumer;
- }
-
- private MockConsumer<byte[], byte[]> createGlobalConsumer() {
- return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
- @Override
- public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
-
- @Override
- public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
-
- @Override
- public synchronized long position(final TopicPartition partition) {
- return 0L;
- }
- };
- }
-
-}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index bde70b4..7343d59 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -216,10 +216,36 @@ public class TopologyTestDriver implements Closeable {
public TopologyTestDriver(final Topology topology,
final Properties config,
final long initialWallClockTimeMs) {
+
+ this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
+ }
+
+ /**
+ * Create a new test diver instance.
+ *
+ * @param builder builder for the topology to be tested
+ * @param config the configuration for the topology
+ */
+ protected TopologyTestDriver(final InternalTopologyBuilder builder,
+ final Properties config) {
+ this(builder, config, System.currentTimeMillis());
+
+ }
+
+ /**
+ * Create a new test diver instance.
+ *
+ * @param builder builder for the topology to be tested
+ * @param config the configuration for the topology
+ * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
+ */
+ private TopologyTestDriver(final InternalTopologyBuilder builder,
+ final Properties config,
+ final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new StreamsConfig(config);
mockTime = new MockTime(initialWallClockTimeMs);
- internalTopologyBuilder = topology.internalTopologyBuilder;
+ internalTopologyBuilder = builder;
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
processorTopology = internalTopologyBuilder.build(null);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.