You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/30 01:21:56 UTC
[kafka] branch trunk updated: KAFKA-3625: Add public test utils for
Kafka Streams (#4402)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a78f66a KAFKA-3625: Add public test utils for Kafka Streams (#4402)
a78f66a is described below
commit a78f66a5ae7865418ee482c535e883598e1ec51f
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Jan 29 17:21:48 2018 -0800
KAFKA-3625: Add public test utils for Kafka Streams (#4402)
* KAFKA-3625: Add public test utils for Kafka Streams
- add new artifact test-utils
- add TopologyTestDriver
- add MockTime, TestRecord, add TestRecordFactory
Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>
---
build.gradle | 36 +-
checkstyle/suppressions.xml | 8 +-
.../kafka/clients/producer/ProducerRecord.java | 2 +-
docs/streams/upgrade-guide.html | 6 +
settings.gradle | 2 +-
.../kafka/streams/processor/ProcessorContext.java | 4 +-
.../internals/InternalTopologyBuilder.java | 6 +
.../streams/processor/internals/StreamTask.java | 4 +-
.../state/internals/QueryableStoreProvider.java | 2 +-
.../state/internals/WrappingStoreProvider.java | 2 +-
.../kafka/streams/InternalTopologyAccessor.java | 2 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 6 +-
.../apache/kafka/streams/TopologyTestDriver.java | 624 +++++++++++++++++++
.../kafka/streams/test/ConsumerRecordFactory.java | 415 ++++++++++++
.../apache/kafka/streams/test/OutputVerifier.java | 241 +++++++
.../org/apache/kafka/streams/MockTimeTest.java | 56 ++
.../kafka/streams/TopologyTestDriverTest.java | 692 +++++++++++++++++++++
.../streams/test/ConsumerRecordFactoryTest.java | 265 ++++++++
.../kafka/streams/test/OutputVerifierTest.java | 584 +++++++++++++++++
19 files changed, 2943 insertions(+), 14 deletions(-)
diff --git a/build.gradle b/build.gradle
index 725cf0b..aedd943 100644
--- a/build.gradle
+++ b/build.gradle
@@ -513,7 +513,7 @@ for ( sv in availableScalaVersions ) {
}
def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:test-utils', 'streams:examples'] + connectPkgs
/** Create one task per default Scala version */
def withDefScalaVersions(taskName) {
@@ -729,6 +729,8 @@ project(':core') {
from(project(':connect:file').configurations.runtime) { into("libs/") }
from(project(':streams').jar) { into("libs/") }
from(project(':streams').configurations.runtime) { into("libs/") }
+ from(project(':streams:test-utils').jar) { into("libs/") }
+ from(project(':streams:test-utils').configurations.runtime) { into("libs/") }
from(project(':streams:examples').jar) { into("libs/") }
from(project(':streams:examples').configurations.runtime) { into("libs/") }
duplicatesStrategy 'exclude'
@@ -954,6 +956,38 @@ project(':streams') {
}
}
+project(':streams:test-utils') {
+ archivesBaseName = "kafka-streams-test-utils"
+
+ dependencies {
+ compile project(':streams')
+ compile project(':clients')
+
+ testCompile project(':clients').sourceSets.test.output
+ testCompile libs.junit
+
+ testRuntime libs.slf4jlog4j
+ }
+
+ javadoc {
+ include "**/org/apache/kafka/streams/test/**"
+ exclude "**/internals/**"
+ }
+
+ tasks.create(name: "copyDependantLibs", type: Copy) {
+ from (configurations.runtime) {
+ exclude('kafka-streams*')
+ }
+ into "$buildDir/dependant-libs-${versions.scala}"
+ duplicatesStrategy 'exclude'
+ }
+
+ jar {
+ dependsOn 'copyDependantLibs'
+ }
+
+}
+
project(':streams:examples') {
archivesBaseName = "kafka-streams-examples"
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 664a3f6..c6c2d23 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -153,7 +153,7 @@
<suppress checks="NPathComplexity"
files="StreamThread.java"/>
- <!-- streams tests -->
+ <!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"
files="(StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/>
@@ -185,6 +185,12 @@
<suppress checks="NPathComplexity"
files="KStreamKStreamLeftJoinTest.java"/>
+ <!-- Streams Test-Utils -->
+ <suppress checks="ClassFanOutComplexity"
+ files="TopologyTestDriver.java"/>
+ <suppress checks="ClassDataAbstractionCoupling"
+ files="TopologyTestDriver.java"/>
+
<!-- Tools -->
<suppress checks="ClassDataAbstractionCoupling"
files="VerifiableConsumer.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index c8ff00b..5d6bb3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -104,7 +104,7 @@ public class ProducerRecord<K, V> {
* @param value The record contents
* @param headers The headers that will be included in the record
*/
- public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
+ public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 11c3a7e..1d5a342 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -71,6 +71,12 @@
<p>
+ There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
+ You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.
+ For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
+ </p>
+
+ <p>
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier">KIP-220</a>
enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients.
You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code>
diff --git a/settings.gradle b/settings.gradle
index f0fdf07..e599d01 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 385d641..d4393aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -109,12 +109,12 @@ public interface ProcessorContext {
* by how long an iteration of the processing loop takes to complete</li>
* </ul>
*
- * @param interval the time interval between punctuations
+ * @param intervalMs the time interval between punctuations in milliseconds
* @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
* @param callback a function consuming timestamps representing the current stream or system time
* @return a handle allowing cancellation of the punctuation schedule established by this method
*/
- Cancellable schedule(long interval, PunctuationType type, Punctuator callback);
+ Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
/**
* Schedules a periodic operation for processors. A processor may call this method during
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 2402d2e..7d4b592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1019,6 +1019,12 @@ public class InternalTopologyBuilder {
return Collections.unmodifiableMap(globalStateStores);
}
+ public Set<String> allStateStoreName() {
+ final Set<String> allNames = new HashSet<>(stateFactories.keySet());
+ allNames.addAll(globalStateStores.keySet());
+ return Collections.unmodifiableSet(allNames);
+ }
+
/**
* Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e293f25..11b2f89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -607,7 +607,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* Note, this is only called in the presence of new records
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- boolean maybePunctuateStreamTime() {
+ public boolean maybePunctuateStreamTime() {
final long timestamp = partitionGroup.timestamp();
// if the timestamp is not known yet, meaning there is not enough data accumulated
@@ -625,7 +625,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* Note, this is called irrespective of the presence of new records
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- boolean maybePunctuateSystemTime() {
+ public boolean maybePunctuateSystemTime() {
final long timestamp = time.milliseconds();
return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index b439ad5..971dcd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -57,7 +57,7 @@ public class QueryableStoreProvider {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
- throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
+ throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index fdc01c6..36d0173 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -50,7 +50,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
- throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
+ throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java b/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
index a6144f2..c3c4504 100644
--- a/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
+++ b/streams/src/test/java/org/apache/kafka/streams/InternalTopologyAccessor.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
*/
public class InternalTopologyAccessor {
- public static InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
+ public static InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
return topology.internalTopologyBuilder;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index e5f15bc..82c39ee 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -262,9 +262,9 @@ public class ProcessorTopologyTestDriver {
* @param timestamp the raw message timestamp
*/
public void process(final String topicName,
- final byte[] key,
- final byte[] value,
- final long timestamp) {
+ final byte[] key,
+ final byte[] value,
+ final long timestamp) {
final TopicPartition tp = partitionsByTopic.get(topicName);
if (tp != null) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
new file mode 100644
index 0000000..6168640
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class makes it easier to write tests to verify the behavior of topologies created with {@link Topology} or
+ * {@link StreamsBuilder}.
+ * You can test simple topologies that have a single processor, or very complex topologies that have multiple sources,
+ * processors, sinks, or sub-topologies.
+ * Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.
+ * <p>
+ * Using the {@code TopologyTestDriver} in tests is easy: simply instantiate the driver and provide a {@link Topology}
+ * (cf. {@link StreamsBuilder#build()}) and {@link Properties configs}, use the driver to supply an
+ * input message to the topology, and then use the driver to read and verify any messages output by the topology.
+ * <p>
+ * Although the driver doesn't use a real Kafka broker, it does simulate Kafka {@link Consumer consumers} and
+ * {@link Producer producers} that read and write raw {@code byte[]} messages.
+ * You can either deal with messages that have {@code byte[]} keys and values or you use {@link ConsumerRecordFactory}
+ * and {@link OutputVerifier} that work with regular Java objects instead of raw bytes.
+ *
+ * <h2>Driver setup</h2>
+ * In order to create a {@code TopologyTestDriver} instance, you need a {@link Topology} and a {@link Properties config}.
+ * The configuration needs to be representative of what you'd supply to the real topology, so that means including
+ * several key properties (cf. {@link StreamsConfig}).
+ * For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is
+ * needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
+ *
+ * <pre>{@code
+ * Properties props = new Properties();
+ * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ * props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ * Topology topology = ...
+ * TopologyTestDriver driver = new TopologyTestDriver(topology, props);
+ * }</pre>
+ *
+ * <h2>Processing messages</h2>
+ * <p>
+ * Your test can supply new input records on any of the topics that the topology's sources consume.
+ * This test driver simulates single-partitioned input topics.
+ * Here's an example of an input message on the topic named {@code input-topic}:
+ *
+ * <pre>
+ * ConsumerRecordFactory factory = new ConsumerRecordFactory(strSerializer, strSerializer);
+ * driver.pipeInput(factory.create("input-topic","key1", "value1"));
+ * </pre>
+ *
+ * When {@code #pipeInput()} is called, the driver passes the input message through to the appropriate source that
+ * consumes the named topic, and will invoke the processor(s) downstream of the source.
+ * If your topology's processors forward messages to sinks, your test can then consume these output messages to verify
+ * they match the expected outcome.
+ * For example, if our topology should have generated 2 messages on {@code output-topic-1} and 1 message on
+ * {@code output-topic-2}, then our test can obtain these messages using the
+ * {@link #readOutput(String, Deserializer, Deserializer)} method:
+ *
+ * <pre>{@code
+ * ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
+ * ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
+ * ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
+ * }</pre>
+ *
+ * Again, our example topology generates messages with string keys and values, so we supply our string deserializer
+ * instance for use on both the keys and values. Your test logic can then verify whether these output records are
+ * correct.
+ * Note, that calling {@link ProducerRecord#equals(Object)} compares all attributes including key, value, timestamp,
+ * topic, partition, and headers.
+ * If you only want to compare key and value (and maybe timestamp), using {@link OutputVerifier} instead of
+ * {@link ProducerRecord#equals(Object)} can simplify your code as you can ignore attributes you are not interested in.
+ * <p>
+ * Note, that calling {@code pipeInput()} will also trigger {@link PunctuationType#STREAM_TIME event-time} base
+ * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) punctuation} callbacks.
+ * However, you won't trigger {@link PunctuationType#WALL_CLOCK_TIME wall-clock} type punctuations that you must
+ * trigger manually via {@link #advanceWallClockTime(long)}.
+ * <p>
+ * Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
+ * {@link org.apache.kafka.streams.processor.Processor processors}.
+ *
+ * <h2>Processor state</h2>
+ * <p>
+ * Some processors use Kafka {@link StateStore state storage}, so this driver class provides the generic
+ * {@link #getStateStore(String)} as well as store-type specific methods so that your tests can check the underlying
+ * state store(s) used by your topology's processors.
+ * In our previous example, after we supplied a single input message and checked the three output messages, our test
+ * could also check the key value store to verify the processor correctly added, removed, or updated internal state.
+ * Or, our test might have pre-populated some state <em>before</em> submitting the input message, and verified afterward
+ * that the processor(s) correctly updated the state.
+ *
+ * @see ConsumerRecordFactory
+ * @see OutputVerifier
+ */
+@InterfaceStability.Evolving
+public class TopologyTestDriver {
+
+ private final Time mockTime;
+ private final InternalTopologyBuilder internalTopologyBuilder;
+
+ private final static int PARTITION_ID = 0;
+ private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
+ private StreamTask task;
+ private GlobalStateUpdateTask globalStateTask;
+
+ private final ProcessorTopology processorTopology;
+ private final MockProducer<byte[], byte[]> producer;
+
+ private final Set<String> internalTopics = new HashSet<>();
+ private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
+ private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
+ private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
+
+ private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
+
+ /**
+ * Create a new test diver instance.
+ * Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}.
+ *
+ * @param topology the topology to be tested
+ * @param config the configuration for the topology
+ */
+ public TopologyTestDriver(final Topology topology,
+ final Properties config) {
+ this(topology, config, System.currentTimeMillis());
+ }
+ /**
+ * Create a new test diver instance.
+ *
+ * @param topology the topology to be tested
+ * @param config the configuration for the topology
+ * @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
+ */
+ public TopologyTestDriver(final Topology topology,
+ final Properties config,
+ final long initialWallClockTimeMs) {
+ final StreamsConfig streamsConfig = new StreamsConfig(config);
+ mockTime = new MockTime(initialWallClockTimeMs);
+
+ internalTopologyBuilder = topology.internalTopologyBuilder;
+ internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+
+ processorTopology = internalTopologyBuilder.build(null);
+ final ProcessorTopology globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
+ final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
+ producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
+ @Override
+ public List<PartitionInfo> partitionsFor(final String topic) {
+ return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
+ }
+ };
+
+ final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ final StateDirectory stateDirectory = new StateDirectory(streamsConfig, mockTime);
+ final StreamsMetrics streamsMetrics = new StreamsMetricsImpl(
+ new Metrics(),
+ "topology-test-driver-stream-metrics",
+ Collections.<String, String>emptyMap());
+ final ThreadCache cache = new ThreadCache(
+ new LogContext("topology-test-driver "),
+ Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+ streamsMetrics);
+ final StateRestoreListener stateRestoreListener = new StateRestoreListener() {
+ @Override
+ public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {}
+
+ @Override
+ public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {}
+
+ @Override
+ public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {}
+ };
+
+ for (final InternalTopologyBuilder.TopicsInfo topicsInfo : internalTopologyBuilder.topicGroups().values()) {
+ internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+ }
+
+ for (final String topic : processorTopology.sourceTopics()) {
+ final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
+ partitionsByTopic.put(topic, tp);
+ offsetsByTopicPartition.put(tp, new AtomicLong());
+ }
+ consumer.assign(partitionsByTopic.values());
+
+ if (globalTopology != null) {
+ for (final String topicName : globalTopology.sourceTopics()) {
+ final TopicPartition partition = new TopicPartition(topicName, 0);
+ globalPartitionsByTopic.put(topicName, partition);
+ offsetsByTopicPartition.put(partition, new AtomicLong());
+ consumer.updatePartitions(topicName, Collections.singletonList(
+ new PartitionInfo(topicName, 0, null, null, null)));
+ consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
+ consumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
+ }
+
+ final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(
+ new LogContext("mock "),
+ globalTopology,
+ consumer,
+ stateDirectory,
+ stateRestoreListener,
+ streamsConfig);
+
+ final GlobalProcessorContextImpl globalProcessorContext
+ = new GlobalProcessorContextImpl(streamsConfig, stateManager, streamsMetrics, cache);
+ stateManager.setGlobalProcessorContext(globalProcessorContext);
+
+ globalStateTask = new GlobalStateUpdateTask(
+ globalTopology,
+ globalProcessorContext,
+ stateManager,
+ new LogAndContinueExceptionHandler(),
+ new LogContext());
+ globalStateTask.initialize();
+ }
+
+ if (!partitionsByTopic.isEmpty()) {
+ task = new StreamTask(
+ TASK_ID,
+ partitionsByTopic.values(),
+ processorTopology,
+ consumer,
+ new StoreChangelogReader(
+ createRestoreConsumer(processorTopology.storeToChangelogTopic()),
+ stateRestoreListener,
+ new LogContext("topology-test-driver ")),
+ streamsConfig,
+ streamsMetrics,
+ stateDirectory,
+ cache,
+ mockTime,
+ producer);
+ task.initializeStateStores();
+ task.initializeTopology();
+ }
+ }
+
+ /**
+ * Send an input message with the given key, value, and timestamp on the specified topic to the topology and then
+ * commit the messages.
+ *
+ * @param consumerRecord the record to be processed
+ */
+ public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+ final String topicName = consumerRecord.topic();
+
+ final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+ if (topicPartition != null) {
+ final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
+ task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
+ topicName,
+ topicPartition.partition(),
+ offset,
+ consumerRecord.timestamp(),
+ consumerRecord.timestampType(),
+ consumerRecord.checksum(),
+ consumerRecord.serializedKeySize(),
+ consumerRecord.serializedValueSize(),
+ consumerRecord.key(),
+ consumerRecord.value())));
+ producer.clear();
+
+ // Process the record ...
+ ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName));
+ task.process();
+ task.maybePunctuateStreamTime();
+ task.commit();
+
+ // Capture all the records sent to the producer ...
+ for (final ProducerRecord<byte[], byte[]> record : producer.history()) {
+ Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
+ if (outputRecords == null) {
+ outputRecords = new LinkedList<>();
+ outputRecordsByTopic.put(record.topic(), outputRecords);
+ }
+ outputRecords.add(record);
+
+ // Forward back into the topology if the produced record is to an internal or a source topic ...
+ final String outputTopicName = record.topic();
+ if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) {
+ final byte[] serializedKey = record.key();
+ final byte[] serializedValue = record.value();
+
+ pipeInput(new ConsumerRecord<>(
+ outputTopicName,
+ -1,
+ -1L,
+ record.timestamp(),
+ TimestampType.CREATE_TIME,
+ 0L,
+ serializedKey == null ? 0 : serializedKey.length,
+ serializedValue == null ? 0 : serializedValue.length,
+ serializedKey,
+ serializedValue));
+ }
+ }
+ } else {
+ final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
+ if (globalTopicPartition == null) {
+ throw new IllegalArgumentException("Unknown topic: " + topicName);
+ }
+ final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1;
+ globalStateTask.update(new ConsumerRecord<>(
+ globalTopicPartition.topic(),
+ globalTopicPartition.partition(),
+ offset,
+ consumerRecord.timestamp(),
+ consumerRecord.timestampType(),
+ consumerRecord.checksum(),
+ consumerRecord.serializedKeySize(),
+ consumerRecord.serializedValueSize(),
+ consumerRecord.key(),
+ consumerRecord.value()));
+ globalStateTask.flushState();
+ }
+ }
+
+ /**
+ * Send input messages to the topology and then commit each message individually.
+ *
+ * @param records a list of records to be processed
+ */
+ public void pipeInput(final List<ConsumerRecord<byte[], byte[]>> records) {
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ pipeInput(record);
+ }
+ }
+
+ /**
+ * Advances the internally mocked wall-clock time.
+ * This might trigger a {@link PunctuationType#WALL_CLOCK_TIME wall-clock} type
+ * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) punctuations}.
+ *
+ * @param advanceMs the amount of time to advance wall-clock time in milliseconds
+ */
+ public void advanceWallClockTime(final long advanceMs) {
+ mockTime.sleep(advanceMs);
+ task.maybePunctuateSystemTime();
+ task.commit();
+ }
+
+ /**
+ * Read the next record from the given topic.
+ * These records were output by the topology during the previous calls to {@link #pipeInput(ConsumerRecord)}.
+ *
+ * @param topic the name of the topic
+ * @return the next record on that topic, or {@code null} if there is no record available
+ */
+ public ProducerRecord<byte[], byte[]> readOutput(final String topic) {
+ final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
+ if (outputRecords == null) {
+ return null;
+ }
+ return outputRecords.poll();
+ }
+
+ /**
+ * Read the next record from the given topic.
+ * These records were output by the topology during the previous calls to {@link #pipeInput(ConsumerRecord)}.
+ *
+ * @param topic the name of the topic
+ * @param keyDeserializer the deserializer for the key type
+ * @param valueDeserializer the deserializer for the value type
+ * @return the next record on that topic, or {@code null} if there is no record available
+ */
+ public <K, V> ProducerRecord<K, V> readOutput(final String topic,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer) {
+ final ProducerRecord<byte[], byte[]> record = readOutput(topic);
+ if (record == null) {
+ return null;
+ }
+ final K key = keyDeserializer.deserialize(record.topic(), record.key());
+ final V value = valueDeserializer.deserialize(record.topic(), record.value());
+ return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
+ }
+
+ /**
+ * Get all {@link StateStore StateStores} from the topology.
+ * The stores can be a "regular" or global stores.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @return all stores my name
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ public Map<String, StateStore> getAllStateStores() {
+ final Map<String, StateStore> allStores = new HashMap<>();
+ for (final String storeName : internalTopologyBuilder.allStateStoreName()) {
+ allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName));
+ }
+ return allStores;
+ }
+
+ /**
+ * Get the {@link StateStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the state store, or {@code null} if no store has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getKeyValueStore(String)
+ * @see #getWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ public StateStore getStateStore(final String name) {
+ return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+ }
+
+ /**
+ * Get the {@link KeyValueStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link KeyValueStore} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
+ final StateStore store = getStateStore(name);
+ return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
+ }
+
+ /**
+ * Get the {@link WindowStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link WindowStore} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getSessionStore(String) (String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> WindowStore<K, V> getWindowStore(final String name) {
+ final StateStore store = getStateStore(name);
+ return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
+ }
+
+ /**
+ * Get the {@link SessionStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getWindowStore(String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> SessionStore<K, V> getSessionStore(final String name) {
+ final StateStore store = getStateStore(name);
+ return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
+ }
+
+ /**
+ * Close the driver, its topology, and all processors.
+ */
+ public void close() {
+ if (task != null) {
+ task.close(true, false);
+ }
+ if (globalStateTask != null) {
+ try {
+ globalStateTask.close();
+ } catch (final IOException e) {
+ // ignore
+ }
+ }
+ }
+
+ static class MockTime implements Time {
+ private final AtomicLong timeMs;
+ private final AtomicLong highResTimeNs;
+
+ MockTime(final long startTimestampMs) {
+ this.timeMs = new AtomicLong(startTimestampMs);
+ this.highResTimeNs = new AtomicLong(startTimestampMs * 1000L * 1000L);
+ }
+
+ @Override
+ public long milliseconds() {
+ return timeMs.get();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return highResTimeNs.get();
+ }
+
+ @Override
+ public long hiResClockMs() {
+ return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
+ }
+
+ @Override
+ public void sleep(final long ms) {
+ if (ms < 0) {
+ throw new IllegalArgumentException("Sleep ms cannot be negative.");
+ }
+ timeMs.addAndGet(ms);
+ highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
+ }
+ }
+
+ private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
+ final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
+ @Override
+ public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
+
+ @Override
+ public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
+
+ @Override
+ public synchronized long position(final TopicPartition partition) {
+ return 0L;
+ }
+ };
+
+ // for each store
+ for (final Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) {
+ final String topicName = storeAndTopic.getValue();
+ // Set up the restore-state topic ...
+ // consumer.subscribe(new TopicPartition(topicName, 0));
+ // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
+ final List<PartitionInfo> partitionInfos = new ArrayList<>();
+ partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
+ consumer.updatePartitions(topicName, partitionInfos);
+ consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
+ }
+ return consumer;
+ }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
new file mode 100644
index 0000000..ea8d632
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.TopologyTestDriver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Factory to create {@link ConsumerRecord consumer records} for a single single-partitioned topic with given key and
+ * value {@link Serializer serializers}.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ *
+ * @see TopologyTestDriver
+ */
+@InterfaceStability.Evolving
+public class ConsumerRecordFactory<K, V> {
+ private final String topicName;
+ private final Serializer<K> keySerializer;
+ private final Serializer<V> valueSerializer;
+ private long timeMs;
+ private long advanceMs;
+
+ /**
+ * Create a new factory for the given topic.
+ * Uses current system time as start timestamp.
+ * Auto-advance is disabled.
+ *
+ * @param keySerializer the key serializer
+ * @param valueSerializer the value serializer
+ */
+ public ConsumerRecordFactory(final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
+ this(null, keySerializer, valueSerializer, System.currentTimeMillis());
+ }
+
+ /**
+ * Create a new factory for the given topic.
+ * Uses current system time as start timestamp.
+ * Auto-advance is disabled.
+ *
+ * @param defaultTopicName the default topic name used for all generated {@link ConsumerRecord consumer records}
+ * @param keySerializer the key serializer
+ * @param valueSerializer the value serializer
+ */
+ public ConsumerRecordFactory(final String defaultTopicName,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer) {
+ this(defaultTopicName, keySerializer, valueSerializer, System.currentTimeMillis());
+ }
+
+ /**
+ * Create a new factory for the given topic.
+ * Auto-advance is disabled.
+ *
+ * @param keySerializer the key serializer
+ * @param valueSerializer the value serializer
+ * @param startTimestampMs the initial timestamp for generated records
+ */
+ public ConsumerRecordFactory(final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final long startTimestampMs) {
+ this(null, keySerializer, valueSerializer, startTimestampMs, 0L);
+ }
+
+ /**
+ * Create a new factory for the given topic.
+ * Auto-advance is disabled.
+ *
+ * @param defaultTopicName the topic name used for all generated {@link ConsumerRecord consumer records}
+ * @param keySerializer the key serializer
+ * @param valueSerializer the value serializer
+ * @param startTimestampMs the initial timestamp for generated records
+ */
+ public ConsumerRecordFactory(final String defaultTopicName,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final long startTimestampMs) {
+ this(defaultTopicName, keySerializer, valueSerializer, startTimestampMs, 0L);
+ }
+
+ /**
+ * Create a new factory for the given topic.
+ *
+ * @param keySerializer the key serializer
+ * @param valueSerializer the value serializer
+ * @param startTimestampMs the initial timestamp for generated records
+ * @param autoAdvanceMs the time increment pre generated record
+ */
+ public ConsumerRecordFactory(final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final long startTimestampMs,
+ final long autoAdvanceMs) {
+ this(null, keySerializer, valueSerializer, startTimestampMs, autoAdvanceMs);
+ }
+
+ /**
+ * Create a new factory for the given topic.
+ *
+ * @param defaultTopicName the topic name used for all generated {@link ConsumerRecord consumer records}
+ * @param keySerializer the key serializer
+ * @param valueSerializer the value serializer
+ * @param startTimestampMs the initial timestamp for generated records
+ * @param autoAdvanceMs the time increment pre generated record
+ */
+ public ConsumerRecordFactory(final String defaultTopicName,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final long startTimestampMs,
+ final long autoAdvanceMs) {
+ Objects.requireNonNull(keySerializer, "keySerializer cannot be null");
+ Objects.requireNonNull(valueSerializer, "valueSerializer cannot be null");
+ this.topicName = defaultTopicName;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ timeMs = startTimestampMs;
+ advanceMs = autoAdvanceMs;
+ }
+
+ /**
+ * Advances the internally tracked time.
+ *
+ * @param advanceMs the amount of time to advance
+ */
+ public void advanceTimeMs(final long advanceMs) {
+ if (advanceMs < 0) {
+ throw new IllegalArgumentException("advanceMs must be positive");
+ }
+ timeMs += advanceMs;
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with the given topic name, key, value, and timestamp.
+ * Does not auto advance internally tracked time.
+ *
+ * @param topicName the topic name
+ * @param key the record key
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final K key,
+ final V value,
+ final long timestampMs) {
+ Objects.requireNonNull(topicName, "topicName cannot be null.");
+ final byte[] serializedKey = keySerializer.serialize(topicName, key);
+ final byte[] serializedValue = valueSerializer.serialize(topicName, value);
+ return new ConsumerRecord<>(
+ topicName,
+ -1,
+ -1L,
+ timestampMs,
+ TimestampType.CREATE_TIME,
+ 0L,
+ serializedKey == null ? 0 : serializedKey.length,
+ serializedValue == null ? 0 : serializedValue.length,
+ serializedKey,
+ serializedValue);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and given key, value, and timestamp.
+ * Does not auto advance internally tracked time.
+ *
+ * @param key the record key
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final K key,
+ final V value,
+ final long timestampMs) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, K key, V value, long timestampMs) instead.");
+ }
+ return create(topicName, key, value, timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with the given topic name, key, and value.
+ * The timestamp will be generated from the constructor provided and time will auto advance.
+ *
+ * @param topicName the topic name
+ * @param key the record key
+ * @param value the record value
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final K key,
+ final V value) {
+ final long timestamp = timeMs;
+ timeMs += advanceMs;
+ return create(topicName, key, value, timestamp);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and given key and value.
+ * The timestamp will be generated from the constructor provided and time will auto advance.
+ *
+ * @param key the record key
+ * @param value the record value
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final K key,
+ final V value) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, K key, V value) instead.");
+ }
+ return create(topicName, key, value);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name, value, and timestamp.
+ *
+ * @param topicName the topic name
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final V value,
+ final long timestampMs) {
+ return create(topicName, null, value, timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and {@code null}-key as well as given value and timestamp.
+ *
+ * @param value the record value
+ * @param timestampMs the record timestamp
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final V value,
+ final long timestampMs) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, V value, long timestampMs) instead.");
+ }
+ return create(topicName, value, timestampMs);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with {@code null}-key and the given topic name and value.
+ * The timestamp will be generated from the constructor provided and time will auto advance.
+ *
+ * @param topicName the topic name
+ * @param value the record value
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final String topicName,
+ final V value) {
+ return create(topicName, null, value);
+ }
+
+ /**
+ * Create a {@link ConsumerRecord} with default topic name and {@code null}-key was well as given value.
+ * The timestamp will be generated from the constructor provided and time will auto advance.
+ *
+ * @param value the record value
+ * @return the generated {@link ConsumerRecord}
+ */
+ public ConsumerRecord<byte[], byte[]> create(final V value) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, V value, long timestampMs) instead.");
+ }
+ return create(topicName, value);
+ }
+
+ /**
+ * Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
+ * The timestamp will be generated from the constructor provided and time will auto advance.
+ *
+ * @param topicName the topic name
+ * @param keyValues the record keys and values
+ * @return the generated {@link ConsumerRecord consumer records}
+ */
+ public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
+ final List<KeyValue<K, V>> keyValues) {
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
+
+ for (final KeyValue<K, V> keyValue : keyValues) {
+ records.add(create(topicName, keyValue.key, keyValue.value));
+ }
+
+ return records;
+ }
+
+ /**
+ * Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
+ * The timestamp will be generated from the constructor provided and time will auto advance.
+ *
+ * @param keyValues the record keys and values
+ * @return the generated {@link ConsumerRecord consumer records}
+ */
+ public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, List<KeyValue<K, V>> keyValues) instead.");
+ }
+
+ return create(topicName, keyValues);
+ }
+
+ /**
+ * Creates {@link ConsumerRecord consumer records} with the given topic name, keys, and values.
+ * Does not auto advance internally tracked time.
+ *
+ * @param topicName the topic name
+ * @param keyValues the record keys and values
+ * @param startTimestamp the timestamp for the first generated record
+ * @param advanceMs the time difference between two consecutive generated records
+ * @return the generated {@link ConsumerRecord consumer records}
+ */
+ public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
+ final List<KeyValue<K, V>> keyValues,
+ final long startTimestamp,
+ final long advanceMs) {
+ if (advanceMs < 0) {
+ throw new IllegalArgumentException("advanceMs must be positive");
+ }
+
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(keyValues.size());
+
+ long timestamp = startTimestamp;
+ for (final KeyValue<K, V> keyValue : keyValues) {
+ records.add(create(topicName, keyValue.key, keyValue.value, timestamp));
+ timestamp += advanceMs;
+ }
+
+ return records;
+ }
+
+ /**
+ * Creates {@link ConsumerRecord consumer records} with default topic name as well as given keys and values.
+ * Does not auto advance internally tracked time.
+ *
+ * @param keyValues the record keys and values
+ * @param startTimestamp the timestamp for the first generated record
+ * @param advanceMs the time difference between two consecutive generated records
+ * @return the generated {@link ConsumerRecord consumer records}
+ */
+ public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
+ final long startTimestamp,
+ final long advanceMs) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs) instead.");
+ }
+
+ return create(topicName, keyValues, startTimestamp, advanceMs);
+ }
+
+ /**
+ * Creates {@link ConsumerRecord consumer records} with the given topic name, keys and values.
+ * For each generated record, the time is advanced by 1.
+ * Does not auto advance internally tracked time.
+ *
+ * @param topicName the topic name
+ * @param keyValues the record keys and values
+ * @param startTimestamp the timestamp for the first generated record
+ * @return the generated {@link ConsumerRecord consumer records}
+ */
+ public List<ConsumerRecord<byte[], byte[]>> create(final String topicName,
+ final List<KeyValue<K, V>> keyValues,
+ final long startTimestamp) {
+ return create(topicName, keyValues, startTimestamp, 1);
+ }
+
+ /**
+ * Creates {@link ConsumerRecord consumer records} with the given keys and values.
+ * For each generated record, the time is advanced by 1.
+ * Does not auto advance internally tracked time.
+ *
+ * @param keyValues the record keys and values
+ * @param startTimestamp the timestamp for the first generated record
+ * @return the generated {@link ConsumerRecord consumer records}
+ */
+ public List<ConsumerRecord<byte[], byte[]>> create(final List<KeyValue<K, V>> keyValues,
+ final long startTimestamp) {
+ if (topicName == null) {
+ throw new IllegalStateException("ConsumerRecordFactory was created without defaultTopicName. " +
+ "Use #create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp) instead.");
+ }
+
+ return create(topicName, keyValues, startTimestamp, 1);
+ }
+
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
new file mode 100644
index 0000000..09ed294
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/OutputVerifier.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.streams.TopologyTestDriver;
+
+import java.util.Objects;
+
+/**
+ * Helper class to verify topology result records.
+ *
+ * @see TopologyTestDriver
+ */
+public class OutputVerifier {
+
+ /**
+ * Compares a {@link ProducerRecord} with the provided value and throws an {@link AssertionError} if the
+ * {@code ProducerRecord}'s value is not equal to the expected value.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedValue}
+ */
+ public static <K, V> void compareValue(final ProducerRecord<K, V> record,
+ final V expectedValue) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final V recordValue = record.value();
+ final AssertionError error = new AssertionError("Expected value=" + expectedValue + " but was value=" + recordValue);
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the values of two {@link ProducerRecord}'s and throws an {@link AssertionError} if they are not equal to
+ * each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s value is not equal to {@code expectedRecord}'s value
+ */
+ public static <K, V> void compareValue(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareValue(record, expectedRecord.value());
+ }
+
+ /**
+ * Compares a {@link ProducerRecord} with the provided key and value and throws an {@link AssertionError} if the
+ * {@code ProducerRecord}'s key or value is not equal to the expected key or value.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedKey the expected key of the {@code ProducerRecord}
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedKey} or {@code expectedValue}
+ */
+ public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
+ final K expectedKey,
+ final V expectedValue) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final K recordKey = record.key();
+ final V recordValue = record.value();
+ final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> " +
+ "but was <" + recordKey + ", " + recordValue + ">");
+
+ if (recordKey != null) {
+ if (!recordKey.equals(expectedKey)) {
+ throw error;
+ }
+ } else if (expectedKey != null) {
+ throw error;
+ }
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the keys and values of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the keys or
+ * values are not equal to each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key or value is not equal to {@code expectedRecord}'s key or value
+ */
+ public static <K, V> void compareKeyValue(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareKeyValue(record, expectedRecord.key(), expectedRecord.value());
+ }
+
+ /**
+ * Compares a {@link ProducerRecord} with the provided value and timestamp and throws an {@link AssertionError} if
+ * the {@code ProducerRecord}'s value or timestamp is not equal to the expected value or timestamp.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param expectedTimestamp the expected timestamps of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedValue} or {@code expectedTimestamp}
+ */
+ public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
+ final V expectedValue,
+ final long expectedTimestamp) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final V recordValue = record.value();
+ final long recordTimestamp = record.timestamp();
+ final AssertionError error = new AssertionError("Expected value=" + expectedValue + " with timestamp=" + expectedTimestamp +
+ " but was value=" + recordValue + " with timestamp=" + recordTimestamp);
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+
+ if (recordTimestamp != expectedTimestamp) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the values and timestamps of two {@link ProducerRecord}'s and throws an {@link AssertionError} if the
+ * values or timestamps are not equal to each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s value or timestamp is not equal to {@code expectedRecord}'s value or timestamp
+ */
+ public static <K, V> void compareValueTimestamp(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareValueTimestamp(record, expectedRecord.value(), expectedRecord.timestamp());
+ }
+
+ /**
+ * Compares a {@link ProducerRecord} with the provided key, value, and timestamp and throws an
+ * {@link AssertionError} if the {@code ProducerRecord}'s key, value, or timestamp is not equal to the expected key,
+ * value, or timestamp.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedKey the expected key of the {@code ProducerRecord}
+ * @param expectedValue the expected value of the {@code ProducerRecord}
+ * @param expectedTimestamp the expected timestamp of the {@code ProducerRecord}
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key, value, timestamp is not equal to {@code expectedKey},
+ * {@code expectedValue}, or {@code expectedTimestamps}
+ */
+ public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
+ final K expectedKey,
+ final V expectedValue,
+ final long expectedTimestamp) throws AssertionError {
+ Objects.requireNonNull(record);
+
+ final K recordKey = record.key();
+ final V recordValue = record.value();
+ final long recordTimestamp = record.timestamp();
+ final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
+ " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
+
+ if (recordKey != null) {
+ if (!recordKey.equals(expectedKey)) {
+ throw error;
+ }
+ } else if (expectedKey != null) {
+ throw error;
+ }
+
+ if (recordValue != null) {
+ if (!recordValue.equals(expectedValue)) {
+ throw error;
+ }
+ } else if (expectedValue != null) {
+ throw error;
+ }
+
+ if (recordTimestamp != expectedTimestamp) {
+ throw error;
+ }
+ }
+
+ /**
+ * Compares the keys, values, and timestamps of two {@link ProducerRecord}'s and throws an {@link AssertionError} if
+ * the keys, values, or timestamps are not equal to each other.
+ *
+ * @param record a output {@code ProducerRecord} for verification
+ * @param expectedRecord a {@code ProducerRecord} for verification
+ * @param <K> the key type
+ * @param <V> the value type
+ * @throws AssertionError if {@code ProducerRecord}'s key, value, or timestamp is not equal to
+ * {@code expectedRecord}'s key, value, or timestamp
+ */
+ public static <K, V> void compareKeyValueTimestamp(final ProducerRecord<K, V> record,
+ final ProducerRecord<K, V> expectedRecord) throws AssertionError {
+ Objects.requireNonNull(expectedRecord);
+ compareKeyValueTimestamp(record, expectedRecord.key(), expectedRecord.value(), expectedRecord.timestamp());
+ }
+
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockTimeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockTimeTest.java
new file mode 100644
index 0000000..bc11b70
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockTimeTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockTimeTest {
+
+ @Test
+ public void shouldSetStartTime() {
+ final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
+ assertEquals(42L, time.milliseconds());
+ assertEquals(42L * 1000L * 1000L, time.nanoseconds());
+ }
+
+ @Test
+ public void shouldGetNanosAsMillis() {
+ final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
+ assertEquals(42L, time.hiResClockMs());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotAllowNegativeSleep() {
+ new TopologyTestDriver.MockTime(42).sleep(-1L);
+ }
+
+ @Test
+ public void shouldAdvanceTimeOnSleep() {
+ final TopologyTestDriver.MockTime time = new TopologyTestDriver.MockTime(42L);
+
+ assertEquals(42L, time.milliseconds());
+ time.sleep(1L);
+ assertEquals(43L, time.milliseconds());
+ time.sleep(0L);
+ assertEquals(43L, time.milliseconds());
+ time.sleep(3L);
+ assertEquals(46L, time.milliseconds());
+ }
+
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
new file mode 100644
index 0000000..4100f18
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TopologyTestDriverTest {
+ private final static String SOURCE_TOPIC_1 = "source-topic-1";
+ private final static String SOURCE_TOPIC_2 = "source-topic-2";
+ private final static String SINK_TOPIC_1 = "sink-topic-1";
+ private final static String SINK_TOPIC_2 = "sink-topic-2";
+
+ private final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory<>(
+ new ByteArraySerializer(),
+ new ByteArraySerializer());
+
+ private final byte[] key1 = new byte[0];
+ private final byte[] value1 = new byte[0];
+ private final long timestamp1 = 42L;
+ private final ConsumerRecord<byte[], byte[]> consumerRecord1 = consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, timestamp1);
+
+ private final byte[] key2 = new byte[0];
+ private final byte[] value2 = new byte[0];
+ private final long timestamp2 = 43L;
+ private final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
+
+ private TopologyTestDriver testDriver;
+ private final Properties config = new Properties() {
+ {
+ put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver");
+ put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+ put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+ }
+ };
+
+ private final static class Record {
+ private Object key;
+ private Object value;
+ private long timestamp;
+ private long offset;
+ private String topic;
+
+ Record(final ConsumerRecord consumerRecord) {
+ key = consumerRecord.key();
+ value = consumerRecord.value();
+ timestamp = consumerRecord.timestamp();
+ offset = consumerRecord.offset();
+ topic = consumerRecord.topic();
+ }
+
+ Record(final Object key,
+ final Object value,
+ final long timestamp,
+ final long offset,
+ final String topic) {
+ this.key = key;
+ this.value = value;
+ this.timestamp = timestamp;
+ this.offset = offset;
+ this.topic = topic;
+ }
+
+ @Override
+ public String toString() {
+ return "key: " + key + ", value: " + value + ", timestamp: " + timestamp + ", offset: " + offset + ", topic: " + topic;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Record record = (Record) o;
+ return timestamp == record.timestamp &&
+ offset == record.offset &&
+ Objects.equals(key, record.key) &&
+ Objects.equals(value, record.value) &&
+ Objects.equals(topic, record.topic);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value, timestamp, offset, topic);
+ }
+ }
+
+ private final static class Punctuation {
+ private final long intervalMs;
+ private final PunctuationType punctuationType;
+ private final Punctuator callback;
+
+ Punctuation(final long intervalMs,
+ final PunctuationType punctuationType,
+ final Punctuator callback) {
+ this.intervalMs = intervalMs;
+ this.punctuationType = punctuationType;
+ this.callback = callback;
+ }
+ }
+
+ private final class MockPunctuator implements Punctuator {
+ private final List<Long> punctuatedAt = new LinkedList<>();
+
+ @Override
+ public void punctuate(long timestamp) {
+ punctuatedAt.add(timestamp);
+ }
+ }
+
+ private final class MockProcessor implements Processor {
+ private final Collection<Punctuation> punctuations;
+ private ProcessorContext context;
+
+ private boolean initialized = false;
+ private boolean closed = false;
+ private final List<Record> processedRecords = new ArrayList<>();
+
+ MockProcessor() {
+ this(Collections.<Punctuation>emptySet());
+ }
+
+ MockProcessor(final Collection<Punctuation> punctuations) {
+ this.punctuations = punctuations;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ initialized = true;
+ this.context = context;
+ for (final Punctuation punctuation : punctuations) {
+ this.context.schedule(punctuation.intervalMs, punctuation.punctuationType, punctuation.callback);
+ }
+ }
+
+ @Override
+ public void process(Object key, Object value) {
+ processedRecords.add(new Record(key, value, context.timestamp(), context.offset(), context.topic()));
+ context.forward(key, value);
+ }
+
+ @Override
+ public void punctuate(long timestamp) {} // deprecated
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+ }
+
+ private final List<MockProcessor> mockProcessors = new ArrayList<>();
+
+ private final class MockProcessorSupplier implements ProcessorSupplier {
+ private final Collection<Punctuation> punctuations;
+
+ private MockProcessorSupplier() {
+ this(Collections.<Punctuation>emptySet());
+ }
+
+ private MockProcessorSupplier(final Collection<Punctuation> punctuations) {
+ this.punctuations = punctuations;
+ }
+
+ @Override
+ public Processor get() {
+ final MockProcessor mockProcessor = new MockProcessor(punctuations);
+ mockProcessors.add(mockProcessor);
+ return mockProcessor;
+ }
+ }
+
+ @After
+ public void tearDown() {
+ testDriver.close();
+ }
+
+ private Topology setupSourceSinkTopology() {
+ final Topology topology = new Topology();
+
+ final String sourceName = "source";
+
+ topology.addSource(sourceName, SOURCE_TOPIC_1);
+ topology.addSink("sink", SINK_TOPIC_1, sourceName);
+
+ return topology;
+ }
+
+ private Topology setupTopologyWithTwoSubtopologies() {
+ final Topology topology = new Topology();
+
+ final String sourceName1 = "source-1";
+ final String sourceName2 = "source-2";
+
+ topology.addSource(sourceName1, SOURCE_TOPIC_1);
+ topology.addSink("sink-1", SINK_TOPIC_1, sourceName1);
+ topology.addSource(sourceName2, SINK_TOPIC_1);
+ topology.addSink("sink-2", SINK_TOPIC_2, sourceName2);
+
+ return topology;
+ }
+
+
+ private Topology setupSingleProcessorTopology() {
+ return setupSingleProcessorTopology(-1, null, null);
+ }
+
+ private Topology setupSingleProcessorTopology(final long punctuationIntervalMs,
+ final PunctuationType punctuationType,
+ final Punctuator callback) {
+ final Collection<Punctuation> punctuations;
+ if (punctuationIntervalMs > 0 && punctuationType != null && callback != null) {
+ punctuations = Collections.singleton(new Punctuation(punctuationIntervalMs, punctuationType, callback));
+ } else {
+ punctuations = Collections.emptySet();
+ }
+
+ final Topology topology = new Topology();
+
+ final String sourceName = "source";
+
+ topology.addSource(sourceName, SOURCE_TOPIC_1);
+ topology.addProcessor("processor", new MockProcessorSupplier(punctuations), sourceName);
+
+ return topology;
+ }
+
+ private Topology setupMultipleSourceTopology(final String... sourceTopicNames) {
+ final Topology topology = new Topology();
+
+ final String[] processorNames = new String[sourceTopicNames.length];
+ int i = 0;
+ for (final String sourceTopicName : sourceTopicNames) {
+ final String sourceName = sourceTopicName + "-source";
+ final String processorName = sourceTopicName + "-processor";
+ topology.addSource(sourceName, sourceTopicName);
+ processorNames[i++] = processorName;
+ topology.addProcessor(processorName, new MockProcessorSupplier(), sourceName);
+ }
+ topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
+
+ return topology;
+ }
+
+ private Topology setupGlobalStoreTopology(final String... sourceTopicNames) {
+ if (sourceTopicNames.length == 0) {
+ throw new IllegalArgumentException("sourceTopicNames cannot be empty");
+ }
+ final Topology topology = new Topology();
+
+ for (final String sourceTopicName : sourceTopicNames) {
+ topology.addGlobalStore(
+ Stores.<Bytes, byte[]>keyValueStoreBuilder(Stores.inMemoryKeyValueStore(sourceTopicName + "-globalStore"), null, null).withLoggingDisabled(),
+ sourceTopicName,
+ null,
+ null,
+ sourceTopicName,
+ sourceTopicName + "-processor",
+ new MockProcessorSupplier()
+ );
+ }
+
+ return topology;
+ }
+
+ @Test
+ public void shouldInitProcessor() {
+ testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+ assertTrue(mockProcessors.get(0).initialized);
+ }
+
+ @Test
+ public void shouldCloseProcessor() {
+ testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+
+ testDriver.close();
+ assertTrue(mockProcessors.get(0).closed);
+ }
+
+ @Test
+ public void shouldThrowForUnknownTopic() {
+ final String unknownTopic = "unknownTopic";
+ final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory<>(
+ "unknownTopic",
+ new ByteArraySerializer(),
+ new ByteArraySerializer());
+
+ testDriver = new TopologyTestDriver(new Topology(), config);
+ try {
+ testDriver.pipeInput(consumerRecordFactory.create((byte[]) null));
+ fail("Should have throw IllegalArgumentException");
+ } catch (final IllegalArgumentException exception) {
+ assertEquals("Unknown topic: " + unknownTopic, exception.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldProcessRecordForTopic() {
+ testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+
+ testDriver.pipeInput(consumerRecord1);
+ final ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1);
+
+ assertEquals(key1, outputRecord.key());
+ assertEquals(value1, outputRecord.value());
+ assertEquals(SINK_TOPIC_1, outputRecord.topic());
+ }
+
+ @Test
+ public void shouldSetRecordMetadata() {
+ testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);
+
+ testDriver.pipeInput(consumerRecord1);
+
+ final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+ assertEquals(1, processedRecords.size());
+
+ final Record record = processedRecords.get(0);
+ final Record expectedResult = new Record(consumerRecord1);
+ expectedResult.offset = 0L;
+
+ assertThat(record, equalTo(expectedResult));
+ }
+
+ @Test
+ public void shouldSendRecordViaCorrectSourceTopic() {
+ testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
+
+ final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
+ final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+
+ testDriver.pipeInput(consumerRecord1);
+
+ assertEquals(1, processedRecords1.size());
+ assertEquals(0, processedRecords2.size());
+
+ Record record = processedRecords1.get(0);
+ Record expectedResult = new Record(consumerRecord1);
+ expectedResult.offset = 0L;
+ assertThat(record, equalTo(expectedResult));
+
+ testDriver.pipeInput(consumerRecord2);
+
+ assertEquals(1, processedRecords1.size());
+ assertEquals(1, processedRecords2.size());
+
+ record = processedRecords2.get(0);
+ expectedResult = new Record(consumerRecord2);
+ expectedResult.offset = 0L;
+ assertThat(record, equalTo(expectedResult));
+ }
+
+ @Test
+ public void shouldUseSourceSpecificDeserializers() {
+ final Topology topology = new Topology();
+
+ final String sourceName1 = "source-1";
+ final String sourceName2 = "source-2";
+ final String processor = "processor";
+
+ topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1);
+ topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2);
+ topology.addProcessor(processor, new MockProcessorSupplier(), sourceName1, sourceName2);
+ topology.addSink(
+ "sink",
+ SINK_TOPIC_1,
+ new Serializer() {
+ @Override
+ public byte[] serialize(String topic, Object data) {
+ if (data instanceof Long) {
+ return Serdes.Long().serializer().serialize(topic, (Long) data);
+ }
+ return Serdes.Integer().serializer().serialize(topic, (Integer) data);
+ }
+ @Override
+ public void close() {}
+ @Override
+ public void configure(Map configs, boolean isKey) {}
+ },
+ new Serializer() {
+ @Override
+ public byte[] serialize(String topic, Object data) {
+ if (data instanceof String) {
+ return Serdes.String().serializer().serialize(topic, (String) data);
+ }
+ return Serdes.Double().serializer().serialize(topic, (Double) data);
+ }
+ @Override
+ public void close() {}
+ @Override
+ public void configure(Map configs, boolean isKey) {}
+ },
+ processor);
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ final ConsumerRecordFactory<Long, String> source1Factory = new ConsumerRecordFactory<>(
+ SOURCE_TOPIC_1,
+ Serdes.Long().serializer(),
+ Serdes.String().serializer());
+ final ConsumerRecordFactory<Integer, Double> source2Factory = new ConsumerRecordFactory<>(
+ SOURCE_TOPIC_2,
+ Serdes.Integer().serializer(),
+ Serdes.Double().serializer());
+
+ final Long source1Key = 42L;
+ final String source1Value = "anyString";
+ final Integer source2Key = 73;
+ final Double source2Value = 3.14;
+
+ final ConsumerRecord<byte[], byte[]> consumerRecord1 = source1Factory.create(source1Key, source1Value);
+ final ConsumerRecord<byte[], byte[]> consumerRecord2 = source2Factory.create(source2Key, source2Value);
+
+ testDriver.pipeInput(consumerRecord1);
+ OutputVerifier.compareKeyValue(
+ testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()),
+ source1Key,
+ source1Value);
+
+ testDriver.pipeInput(consumerRecord2);
+ OutputVerifier.compareKeyValue(
+ testDriver.readOutput(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer()),
+ source2Key,
+ source2Value);
+ }
+
+ @Test
+ public void shouldUseSinkeSpecificSerializers() {
+ final Topology topology = new Topology();
+
+ final String sourceName1 = "source-1";
+ final String sourceName2 = "source-2";
+
+ topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1);
+ topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2);
+ topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), sourceName1);
+ topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), sourceName2);
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ final ConsumerRecordFactory<Long, String> source1Factory = new ConsumerRecordFactory<>(
+ SOURCE_TOPIC_1,
+ Serdes.Long().serializer(),
+ Serdes.String().serializer());
+ final ConsumerRecordFactory<Integer, Double> source2Factory = new ConsumerRecordFactory<>(
+ SOURCE_TOPIC_2,
+ Serdes.Integer().serializer(),
+ Serdes.Double().serializer());
+
+ final Long source1Key = 42L;
+ final String source1Value = "anyString";
+ final Integer source2Key = 73;
+ final Double source2Value = 3.14;
+
+ final ConsumerRecord<byte[], byte[]> consumerRecord1 = source1Factory.create(source1Key, source1Value);
+ final ConsumerRecord<byte[], byte[]> consumerRecord2 = source2Factory.create(source2Key, source2Value);
+
+ testDriver.pipeInput(consumerRecord1);
+ OutputVerifier.compareKeyValue(
+ testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()),
+ source1Key,
+ source1Value);
+
+ testDriver.pipeInput(consumerRecord2);
+ OutputVerifier.compareKeyValue(
+ testDriver.readOutput(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer()),
+ source2Key,
+ source2Value);
+ }
+
+ @Test
+ public void shouldProcessConsumerRecordList() {
+ testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
+
+ final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
+ final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+
+ final List<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<>(2);
+ testRecords.add(consumerRecord1);
+ testRecords.add(consumerRecord2);
+
+ testDriver.pipeInput(testRecords);
+
+ assertEquals(1, processedRecords1.size());
+ assertEquals(1, processedRecords2.size());
+
+ Record record = processedRecords1.get(0);
+ Record expectedResult = new Record(consumerRecord1);
+ expectedResult.offset = 0L;
+ assertThat(record, equalTo(expectedResult));
+
+ record = processedRecords2.get(0);
+ expectedResult = new Record(consumerRecord2);
+ expectedResult.offset = 0L;
+ assertThat(record, equalTo(expectedResult));
+ }
+
+ @Test
+ public void shouldForwardRecordsFromSubtopologyToSubtopology() {
+ testDriver = new TopologyTestDriver(setupTopologyWithTwoSubtopologies(), config);
+
+ testDriver.pipeInput(consumerRecord1);
+
+ ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1);
+ assertEquals(key1, outputRecord.key());
+ assertEquals(value1, outputRecord.value());
+ assertEquals(SINK_TOPIC_1, outputRecord.topic());
+
+ outputRecord = testDriver.readOutput(SINK_TOPIC_2);
+ assertEquals(key1, outputRecord.key());
+ assertEquals(value1, outputRecord.value());
+ assertEquals(SINK_TOPIC_2, outputRecord.topic());
+ }
+
+ @Test
+ public void shouldPopulateGlobalStore() {
+ testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config);
+
+ testDriver.pipeInput(consumerRecord1);
+
+ final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+ assertEquals(1, processedRecords.size());
+
+ final Record record = processedRecords.get(0);
+ final Record expectedResult = new Record(consumerRecord1);
+ expectedResult.offset = 0L;
+ assertThat(record, equalTo(expectedResult));
+ }
+
+ @Test
+ public void shouldPunctuateOnStreamsTime() {
+ final MockPunctuator mockPunctuator = new MockPunctuator();
+ testDriver = new TopologyTestDriver(
+ setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator),
+ config);
+
+ final List<Long> expectedPunctuations = new LinkedList<>();
+
+ expectedPunctuations.add(42L);
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 42L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 51L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(52L);
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 52L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 61L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(65L);
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 65L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 71L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(72L);
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 72L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(95L);
+ expectedPunctuations.add(95L);
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 95L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 101L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(102L);
+ testDriver.pipeInput(consumerRecordFactory.create(SOURCE_TOPIC_1, key1, value1, 102L));
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ }
+
+ @Test
+ public void shouldPunctuateOnWallClockTime() {
+ final MockPunctuator mockPunctuator = new MockPunctuator();
+ testDriver = new TopologyTestDriver(
+ setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator),
+ config,
+ 0);
+
+ final List<Long> expectedPunctuations = new LinkedList<>();
+
+ expectedPunctuations.add(5L);
+ testDriver.advanceWallClockTime(5L);
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ testDriver.advanceWallClockTime(9L);
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(15L);
+ testDriver.advanceWallClockTime(1L);
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+
+ expectedPunctuations.add(35L);
+ expectedPunctuations.add(35L);
+ testDriver.advanceWallClockTime(20L);
+ assertThat(mockPunctuator.punctuatedAt, equalTo(expectedPunctuations));
+ }
+
+ @Test
+ public void shouldReturnAllStores() {
+ final Topology topology = setupSourceSinkTopology();
+ topology.addStateStore(
+ new KeyValueStoreBuilder<>(
+ Stores.inMemoryKeyValueStore("store"),
+ Serdes.ByteArray(),
+ Serdes.ByteArray(),
+ new SystemTime())
+ .withLoggingDisabled());
+ topology.addGlobalStore(
+ new KeyValueStoreBuilder<>(
+ Stores.inMemoryKeyValueStore("globalStore"),
+ Serdes.ByteArray(),
+ Serdes.ByteArray(),
+ new SystemTime()).withLoggingDisabled(),
+ "sourceProcessorName",
+ Serdes.ByteArray().deserializer(),
+ Serdes.ByteArray().deserializer(),
+ "globalTopicName",
+ "globalProcessorName",
+ new ProcessorSupplier() {
+ @Override
+ public Processor get() {
+ return null;
+ }
+ });
+
+ testDriver = new TopologyTestDriver(topology, config);
+
+ final Set<String> expectedStoreNames = new HashSet<>();
+ expectedStoreNames.add("store");
+ expectedStoreNames.add("globalStore");
+ assertThat(testDriver.getAllStateStores().keySet(), equalTo(expectedStoreNames));
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
new file mode 100644
index 0000000..f75a069
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/ConsumerRecordFactoryTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ConsumerRecordFactoryTest {
+ private final StringSerializer stringSerializer = new StringSerializer();
+ private final IntegerSerializer integerSerializer = new IntegerSerializer();
+
+ private final String topicName = "topic";
+ private final String otherTopicName = "otherTopic";
+ private final String key = "key";
+ private final Integer value = 42;
+ private final long timestamp = 21L;
+ private final byte[] rawKey = stringSerializer.serialize(topicName, key);
+ private final byte[] rawValue = integerSerializer.serialize(topicName, value);
+
+ private final ConsumerRecordFactory<byte[], Integer> factory =
+ new ConsumerRecordFactory<>(topicName, new ByteArraySerializer(), integerSerializer, 0L);
+
+ private final ConsumerRecordFactory<byte[], Integer> defaultFactory =
+ new ConsumerRecordFactory<>(new ByteArraySerializer(), integerSerializer);
+
+ @Test
+ public void shouldAdvanceTime() {
+ factory.advanceTimeMs(3L);
+ verifyRecord(topicName, rawKey, rawValue, 3L, factory.create(topicName, rawKey, value));
+
+ factory.advanceTimeMs(2L);
+ verifyRecord(topicName, rawKey, rawValue, 5L, factory.create(topicName, rawKey, value));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullTopicName() {
+ factory.create(null, rawKey, value, timestamp);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp() {
+ factory.create(null, rawKey, value);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullTopicNameWithNulKey() {
+ factory.create((String) null, value, timestamp);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp() {
+ factory.create((String) null, value);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs() {
+ factory.create(null, Collections.singletonList(KeyValue.pair(rawKey, value)));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairsAndCustomTimestamps() {
+ factory.create(null, Collections.singletonList(KeyValue.pair(rawKey, value)), timestamp, 2L);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName() {
+ defaultFactory.create(rawKey, value, timestamp);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp() {
+ defaultFactory.create(rawKey, value);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey() {
+ defaultFactory.create(value, timestamp);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKeyAndDefaultTimestamp() {
+ defaultFactory.create(value);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairs() {
+ defaultFactory.create(Collections.singletonList(KeyValue.pair(rawKey, value)));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps() {
+ defaultFactory.create(Collections.singletonList(KeyValue.pair(rawKey, value)), timestamp, 2L);
+ }
+
+ @Test
+ public void shouldCreateConsumerRecordWithOtherTopicNameAndTimestamp() {
+ verifyRecord(otherTopicName, rawKey, rawValue, timestamp, factory.create(otherTopicName, rawKey, value, timestamp));
+ }
+
+ @Test
+ public void shouldCreateConsumerRecordWithTimestamp() {
+ verifyRecord(topicName, rawKey, rawValue, timestamp, factory.create(rawKey, value, timestamp));
+ }
+
+ @Test
+ public void shouldCreateConsumerRecordWithOtherTopicName() {
+ verifyRecord(otherTopicName, rawKey, rawValue, 0L, factory.create(otherTopicName, rawKey, value));
+
+ factory.advanceTimeMs(3L);
+ verifyRecord(otherTopicName, rawKey, rawValue, 3L, factory.create(otherTopicName, rawKey, value));
+ }
+
+ @Test
+ public void shouldCreateConsumerRecord() {
+ verifyRecord(topicName, rawKey, rawValue, 0L, factory.create(rawKey, value));
+
+ factory.advanceTimeMs(3L);
+ verifyRecord(topicName, rawKey, rawValue, 3L, factory.create(rawKey, value));
+ }
+
+ @Test
+ public void shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp() {
+ verifyRecord(topicName, null, rawValue, timestamp, factory.create(value, timestamp));
+ }
+
+ @Test
+ public void shouldCreateNullKeyConsumerRecordWithTimestampWithTimestamp() {
+ verifyRecord(topicName, null, rawValue, timestamp, factory.create(value, timestamp));
+ }
+
+ @Test
+ public void shouldCreateNullKeyConsumerRecord() {
+ verifyRecord(topicName, null, rawValue, 0L, factory.create(value));
+
+ factory.advanceTimeMs(3L);
+ verifyRecord(topicName, null, rawValue, 3L, factory.create(value));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldCreateConsumerRecordsFromKeyValuePairs() {
+ final ConsumerRecordFactory<String, Integer> factory =
+ new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, 0L);
+
+ final KeyValue[] keyValuePairs = new KeyValue[5];
+ final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
+
+ for (int i = 0; i < keyValuePairs.length; ++i) {
+ keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
+ rawKeyValuePairs[i] = KeyValue.pair(
+ stringSerializer.serialize(topicName, key + "-" + i),
+ integerSerializer.serialize(topicName, value + i));
+ }
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
+ factory.create(Arrays.<KeyValue<String, Long>>asList(keyValuePairs));
+
+ for (int i = 0; i < keyValuePairs.length; ++i) {
+ verifyRecord(
+ topicName,
+ (byte[]) rawKeyValuePairs[i].key,
+ (byte[]) rawKeyValuePairs[i].value,
+ 0L,
+ records.get(i));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldCreateConsumerRecordsFromKeyValuePairsWithTimestampAndIncrements() {
+ final ConsumerRecordFactory<String, Integer> factory =
+ new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, timestamp, 2L);
+
+ final KeyValue[] keyValuePairs = new KeyValue[5];
+ final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
+
+ for (int i = 0; i < keyValuePairs.length; ++i) {
+ keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
+ rawKeyValuePairs[i] = KeyValue.pair(
+ stringSerializer.serialize(topicName, key + "-" + i),
+ integerSerializer.serialize(topicName, value + i));
+ }
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
+ factory.create(Arrays.<KeyValue<String, Long>>asList(keyValuePairs));
+
+ for (int i = 0; i < keyValuePairs.length; ++i) {
+ verifyRecord(
+ topicName,
+ (byte[]) rawKeyValuePairs[i].key,
+ (byte[]) rawKeyValuePairs[i].value,
+ timestamp + 2L * i,
+ records.get(i));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldCreateConsumerRecordsFromKeyValuePairsWithCustomTimestampAndIncrementsAndNotAdvanceTime() {
+ final ConsumerRecordFactory<String, Integer> factory =
+ new ConsumerRecordFactory<>(topicName, stringSerializer, integerSerializer, 0L);
+
+ final KeyValue[] keyValuePairs = new KeyValue[5];
+ final KeyValue[] rawKeyValuePairs = new KeyValue[keyValuePairs.length];
+
+ for (int i = 0; i < keyValuePairs.length; ++i) {
+ keyValuePairs[i] = KeyValue.pair(key + "-" + i, value + i);
+ rawKeyValuePairs[i] = KeyValue.pair(
+ stringSerializer.serialize(topicName, key + "-" + i),
+ integerSerializer.serialize(topicName, value + i));
+ }
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
+ factory.create(Arrays.<KeyValue<String, Long>>asList(keyValuePairs), timestamp, 2L);
+
+ for (int i = 0; i < keyValuePairs.length; ++i) {
+ verifyRecord(
+ topicName,
+ (byte[]) rawKeyValuePairs[i].key,
+ (byte[]) rawKeyValuePairs[i].value,
+ timestamp + 2L * i,
+ records.get(i));
+ }
+
+ // should not have incremented internally tracked time
+ verifyRecord(topicName, null, rawValue, 0L, factory.create(value));
+ }
+
+ private void verifyRecord(final String topicName,
+ final byte[] rawKey,
+ final byte[] rawValue,
+ final long timestamp,
+ final ConsumerRecord<byte[], byte[]> record) {
+ assertEquals(topicName, record.topic());
+ assertArrayEquals(rawKey, record.key());
+ assertArrayEquals(rawValue, record.value());
+ assertEquals(timestamp, record.timestamp());
+ }
+
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/OutputVerifierTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/OutputVerifierTest.java
new file mode 100644
index 0000000..8220188
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/OutputVerifierTest.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+public class OutputVerifierTest {
+ private final byte[] key = new byte[0];
+ private final byte[] value = new byte[0];
+
+ private final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ value
+ );
+
+ private final ProducerRecord<byte[], byte[]> nullKeyValueRecord = new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null
+ );
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordForCompareValue() {
+ OutputVerifier.compareValue(null, value);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValue() {
+ OutputVerifier.compareValue((ProducerRecord<byte[], byte[]>) null, producerRecord);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullExpectedRecordForCompareValue() {
+ OutputVerifier.compareValue(producerRecord, (ProducerRecord<byte[], byte[]>) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(null, key, value);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(null, producerRecord);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullExpectedRecordForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(producerRecord, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(null, value, 0L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(null, producerRecord);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullExpectedRecordForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(producerRecord, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(null, key, value, 0L);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(null, producerRecord);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, null);
+ }
+
+ @Test
+ public void shouldPassIfValueIsEqualForCompareValue() {
+ OutputVerifier.compareValue(producerRecord, value);
+ }
+
+ @Test
+ public void shouldPassIfValueIsEqualWithNullForCompareValue() {
+ OutputVerifier.compareValue(nullKeyValueRecord, (byte[]) null);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareValue() {
+ OutputVerifier.compareValue(producerRecord, key);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareValue() {
+ OutputVerifier.compareValue(producerRecord, (byte[]) null);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReverseForCompareValue() {
+ OutputVerifier.compareValue(nullKeyValueRecord, value);
+ }
+
+ @Test
+ public void shouldPassIfValueIsEqualForCompareValueWithProducerRecord() {
+ OutputVerifier.compareValue(producerRecord, new ProducerRecord<>(
+ "otherTopic",
+ 0,
+ 0L,
+ value,
+ value
+ ));
+ }
+
+ @Test
+ public void shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord() {
+ OutputVerifier.compareValue(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+ "otherTopic",
+ 0,
+ 0L,
+ value,
+ null
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareValueWithProducerRecord() {
+ OutputVerifier.compareValue(producerRecord, new ProducerRecord<>(
+ "sameTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ value,
+ key
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord() {
+ OutputVerifier.compareValue(producerRecord, new ProducerRecord<byte[], byte[]>(
+ "sameTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ value,
+ null
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueWithProducerRecord() {
+ OutputVerifier.compareValue(nullKeyValueRecord, new ProducerRecord<>(
+ "sameTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ value,
+ value
+ ));
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueIsEqualForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(producerRecord, key, value);
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(nullKeyValueRecord, null, null);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(producerRecord, value, value);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(producerRecord, null, value);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ value),
+ key,
+ value);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(producerRecord, key, key);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(producerRecord, key, null);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue() {
+ OutputVerifier.compareKeyValue(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null),
+ key,
+ value);
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
+ "otherTopic",
+ 0,
+ 0L,
+ key,
+ value));
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+ "otherTopic",
+ 0,
+ 0L,
+ null,
+ null));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ value,
+ value));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ value));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ value),
+ producerRecord);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ key));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(producerRecord, new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord() {
+ OutputVerifier.compareKeyValue(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null),
+ producerRecord);
+ }
+
+ @Test
+ public void shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(producerRecord, value, Long.MAX_VALUE);
+ }
+
+ @Test
+ public void shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(nullKeyValueRecord, null, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(producerRecord, key, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(producerRecord, null, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(nullKeyValueRecord, value, Long.MAX_VALUE);
+ }
+
+ @Test
+ public void shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord() {
+ OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
+ "otherTopic",
+ 0,
+ Long.MAX_VALUE,
+ value,
+ value
+ ));
+ }
+
+ @Test
+ public void shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord() {
+ OutputVerifier.compareValueTimestamp(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+ "otherTopic",
+ 0,
+ Long.MAX_VALUE,
+ value,
+ null
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord() {
+ OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ key
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord() {
+ OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord() {
+ OutputVerifier.compareValueTimestamp(nullKeyValueRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ value
+ ));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfTimestampIsDifferentForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(producerRecord, value, 0);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfTimestampDifferentWithNullReverseForCompareValueTimestamp() {
+ OutputVerifier.compareValueTimestamp(nullKeyValueRecord, null, 0);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord() {
+ OutputVerifier.compareValueTimestamp(producerRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ 0L,
+ key,
+ value
+ ));
+ }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ @Test
+ public void shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, key, value, Long.MAX_VALUE);
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(nullKeyValueRecord, null, null, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, value, value, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, null, value, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ value),
+ key,
+ value,
+ Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, key, key, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, key, null, Long.MAX_VALUE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp() {
+ OutputVerifier.compareKeyValueTimestamp(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null),
+ key,
+ value,
+ Long.MAX_VALUE);
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
+ "otherTopic",
+ 0,
+ Long.MAX_VALUE,
+ key,
+ value));
+ }
+
+ @Test
+ public void shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(nullKeyValueRecord, new ProducerRecord<byte[], byte[]>(
+ "otherTopic",
+ 0,
+ Long.MAX_VALUE,
+ null,
+ null));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ value,
+ value));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ value));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ null,
+ value),
+ producerRecord);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ key));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(producerRecord, new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord() {
+ OutputVerifier.compareKeyValueTimestamp(
+ new ProducerRecord<byte[], byte[]>(
+ "someTopic",
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE,
+ key,
+ null),
+ producerRecord);
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.