You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/12 04:06:46 UTC
[kafka] branch 2.4 updated: KAKFA-9503: Fix TopologyTestDriver
output order (#8065)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 42062a8 KAKFA-9503: Fix TopologyTestDriver output order (#8065)
42062a8 is described below
commit 42062a89f2a7333263c8aff3e67b0eba47b29377
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Feb 11 21:00:17 2020 -0600
KAKFA-9503: Fix TopologyTestDriver output order (#8065)
Migrates TopologyTestDriver processing to be closer to the same processing/ordering
semantics as KafkaStreams. This corrects the output order for recursive topologies
as reported in KAFKA-9503, and also works similarly in the case of task idling.
Cherry-pick of 998f1520f9af2dddfec9a9ac072f8dcf9d9004fd from trunk
Cherry-pick of 7b71cb92b539a547a99dc6dd094f475fd55a8572 from 2.5
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 6 +-
.../apache/kafka/streams/TopologyTestDriver.java | 219 ++++++++++++++-------
.../kafka/streams/TopologyTestDriverTest.java | 178 +++++++++++++++--
.../test-utils/src/test/resources/log4j.properties | 21 ++
4 files changed, 340 insertions(+), 84 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index cf161ac..fe0d7e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -382,7 +382,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* An active task is processable if its buffer contains data for all of its input
* source topic partitions, or if it is enforced to be processable
*/
- boolean isProcessable(final long now) {
+ public boolean isProcessable(final long now) {
if (partitionGroup.allPartitionsBuffered()) {
idleStartTime = RecordQueue.UNKNOWN;
return true;
@@ -1004,4 +1004,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
return partitionTimes;
}
+
+ public boolean hasRecordsQueued() {
+ return numBuffered() > 0;
+ }
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 39d40a8..b9e6d4b 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -211,9 +211,9 @@ public class TopologyTestDriver implements Closeable {
private final MockProducer<byte[], byte[]> producer;
private final Set<String> internalTopics = new HashSet<>();
- private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
- private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
- private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
+ private final Map<String, TopicPartition> partitionsByInputTopic = new HashMap<>();
+ private final Map<String, TopicPartition> globalPartitionsByInputTopic = new HashMap<>();
+ private final Map<TopicPartition, AtomicLong> offsetsByTopicOrPatternPartition = new HashMap<>();
private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
private final boolean eosEnabled;
@@ -274,6 +274,7 @@ public class TopologyTestDriver implements Closeable {
final Properties config,
final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
+ logIfTaskIdleEnabled(streamsConfig);
mockWallClockTime = new MockTime(initialWallClockTimeMs);
internalTopologyBuilder = builder;
@@ -343,16 +344,16 @@ public class TopologyTestDriver implements Closeable {
for (final String topic : processorTopology.sourceTopics()) {
final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
- partitionsByTopic.put(topic, tp);
- offsetsByTopicPartition.put(tp, new AtomicLong());
+ partitionsByInputTopic.put(topic, tp);
+ offsetsByTopicOrPatternPartition.put(tp, new AtomicLong());
}
- consumer.assign(partitionsByTopic.values());
+ consumer.assign(partitionsByInputTopic.values());
if (globalTopology != null) {
for (final String topicName : globalTopology.sourceTopics()) {
final TopicPartition partition = new TopicPartition(topicName, 0);
- globalPartitionsByTopic.put(topicName, partition);
- offsetsByTopicPartition.put(partition, new AtomicLong());
+ globalPartitionsByInputTopic.put(topicName, partition);
+ offsetsByTopicOrPatternPartition.put(partition, new AtomicLong());
consumer.updatePartitions(topicName, Collections.singletonList(
new PartitionInfo(topicName, 0, null, null, null)));
consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
@@ -390,10 +391,10 @@ public class TopologyTestDriver implements Closeable {
globalStateTask = null;
}
- if (!partitionsByTopic.isEmpty()) {
+ if (!partitionsByInputTopic.isEmpty()) {
task = new StreamTask(
TASK_ID,
- new HashSet<>(partitionsByTopic.values()),
+ new HashSet<>(partitionsByInputTopic.values()),
processorTopology,
consumer,
new StoreChangelogReader(
@@ -421,6 +422,20 @@ public class TopologyTestDriver implements Closeable {
eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
}
+ private static void logIfTaskIdleEnabled(final StreamsConfig streamsConfig) {
+ final Long taskIdleTime = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+ if (taskIdleTime > 0) {
+ log.info("Detected {} config in use with TopologyTestDriver (set to {}ms)." +
+ " This means you might need to use TopologyTestDriver#advanceWallClockTime()" +
+ " or enqueue records on all partitions to allow Steams to make progress." +
+ " TopologyTestDriver will log a message each time it cannot process enqueued" +
+ " records due to {}.",
+ StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
+ taskIdleTime,
+ StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+ }
+ }
+
/**
* Get read-only handle on global metrics registry.
*
@@ -448,77 +463,114 @@ public class TopologyTestDriver implements Closeable {
consumerRecord.headers());
}
- private void pipeRecord(final ProducerRecord<byte[], byte[]> record) {
- pipeRecord(record.topic(), record.timestamp(), record.key(), record.value(), record.headers());
- }
-
private void pipeRecord(final String topicName,
- final Long timestamp,
+ final long timestamp,
final byte[] key,
final byte[] value,
final Headers headers) {
+ final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(topicName);
+ final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(topicName);
- if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
- validateSourceTopicNameRegexPattern(topicName);
+ if (inputTopicOrPatternPartition == null && globalInputTopicPartition == null) {
+ throw new IllegalArgumentException("Unknown topic: " + topicName);
}
- final TopicPartition topicPartition = getTopicPartition(topicName);
- if (topicPartition != null) {
- final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
- task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
- topicName,
- topicPartition.partition(),
- offset,
- timestamp,
- TimestampType.CREATE_TIME,
- (long) ConsumerRecord.NULL_CHECKSUM,
- key == null ? ConsumerRecord.NULL_SIZE : key.length,
- value == null ? ConsumerRecord.NULL_SIZE : value.length,
- key,
- value,
- headers)));
-
- // Process the record ...
- task.process();
- task.maybePunctuateStreamTime();
- task.commit();
- captureOutputRecords();
- } else {
- final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
- if (globalTopicPartition == null) {
- throw new IllegalArgumentException("Unknown topic: " + topicName);
+
+ if (inputTopicOrPatternPartition != null) {
+ enqueueTaskRecord(topicName, inputTopicOrPatternPartition, timestamp, key, value, headers);
+ completeAllProcessableWork();
+ }
+
+ if (globalInputTopicPartition != null) {
+ processGlobalRecord(globalInputTopicPartition, timestamp, key, value, headers);
+ }
+ }
+
+ private void enqueueTaskRecord(final String inputTopic,
+ final TopicPartition topicOrPatternPartition,
+ final long timestamp,
+ final byte[] key,
+ final byte[] value,
+ final Headers headers) {
+ task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord<>(
+ inputTopic,
+ topicOrPatternPartition.partition(),
+ offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1,
+ timestamp,
+ TimestampType.CREATE_TIME,
+ (long) ConsumerRecord.NULL_CHECKSUM,
+ key == null ? ConsumerRecord.NULL_SIZE : key.length,
+ value == null ? ConsumerRecord.NULL_SIZE : value.length,
+ key,
+ value,
+ headers)));
+ }
+
+ private void completeAllProcessableWork() {
+ // for internally triggered processing (like wall-clock punctuations),
+ // we might have buffered some records to internal topics that need to
+ // be piped back in to kick-start the processing loop. This is idempotent
+ // and therefore harmless in the case where all we've done is enqueued an
+ // input record from the user.
+ captureOutputsAndReEnqueueInternalResults();
+
+ // If the topology only has global tasks, then `task` would be null.
+ // For this method, it just means there's nothing to do.
+ if (task != null) {
+ while (task.hasRecordsQueued()) {
+ // Process the record ...
+ task.process();
+ task.maybePunctuateStreamTime();
+ task.commit();
+ captureOutputsAndReEnqueueInternalResults();
+ }
+ if (task.hasRecordsQueued()) {
+ log.info("Due to the {} configuration, there are currently some records" +
+ " that cannot be processed. Advancing wall-clock time or" +
+ " enqueuing records on the empty topics will allow" +
+ " Streams to process more.",
+ StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
}
- final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1;
- globalStateTask.update(new ConsumerRecord<>(
- globalTopicPartition.topic(),
- globalTopicPartition.partition(),
- offset,
- timestamp,
- TimestampType.CREATE_TIME,
- (long) ConsumerRecord.NULL_CHECKSUM,
- key == null ? ConsumerRecord.NULL_SIZE : key.length,
- value == null ? ConsumerRecord.NULL_SIZE : value.length,
- key,
- value,
- headers));
- globalStateTask.flushState();
}
}
+ private void processGlobalRecord(final TopicPartition globalInputTopicPartition,
+ final long timestamp,
+ final byte[] key,
+ final byte[] value,
+ final Headers headers) {
+ globalStateTask.update(new ConsumerRecord<>(
+ globalInputTopicPartition.topic(),
+ globalInputTopicPartition.partition(),
+ offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1,
+ timestamp,
+ TimestampType.CREATE_TIME,
+ (long) ConsumerRecord.NULL_CHECKSUM,
+ key == null ? ConsumerRecord.NULL_SIZE : key.length,
+ value == null ? ConsumerRecord.NULL_SIZE : value.length,
+ key,
+ value,
+ headers));
+ globalStateTask.flushState();
+ }
private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
for (final String sourceTopicName : internalTopologyBuilder.sourceTopicNames()) {
if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
- " cannot contain regex pattern for input record topic: " + inputRecordTopic +
- " and hence cannot process the message.");
+ " cannot contain regex pattern for input record topic: " + inputRecordTopic +
+ " and hence cannot process the message.");
}
}
}
- private TopicPartition getTopicPartition(final String topicName) {
- final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+ private TopicPartition getInputTopicOrPatternPartition(final String topicName) {
+ if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
+ validateSourceTopicNameRegexPattern(topicName);
+ }
+
+ final TopicPartition topicPartition = partitionsByInputTopic.get(topicName);
if (topicPartition == null) {
- for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) {
+ for (final Map.Entry<String, TopicPartition> entry : partitionsByInputTopic.entrySet()) {
if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) {
return entry.getValue();
}
@@ -527,7 +579,7 @@ public class TopologyTestDriver implements Closeable {
return topicPartition;
}
- private void captureOutputRecords() {
+ private void captureOutputsAndReEnqueueInternalResults() {
// Capture all the records sent to the producer ...
final List<ProducerRecord<byte[], byte[]>> output = producer.history();
producer.clear();
@@ -540,9 +592,27 @@ public class TopologyTestDriver implements Closeable {
// Forward back into the topology if the produced record is to an internal or a source topic ...
final String outputTopicName = record.topic();
- if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
- || globalPartitionsByTopic.containsKey(outputTopicName)) {
- pipeRecord(record);
+
+ final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(outputTopicName);
+ final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(outputTopicName);
+
+ if (inputTopicOrPatternPartition != null) {
+ enqueueTaskRecord(
+ outputTopicName,
+ inputTopicOrPatternPartition,
+ record.timestamp(),
+ record.key(),
+ record.value(),
+ record.headers());
+ }
+
+ if (globalInputTopicPartition != null) {
+ processGlobalRecord(
+ globalInputTopicPartition,
+ record.timestamp(),
+ record.key(),
+ record.value(),
+ record.headers());
}
}
}
@@ -589,7 +659,7 @@ public class TopologyTestDriver implements Closeable {
task.maybePunctuateSystemTime();
task.commit();
}
- captureOutputRecords();
+ completeAllProcessableWork();
}
/**
@@ -839,23 +909,23 @@ public class TopologyTestDriver implements Closeable {
private void throwIfBuiltInStore(final StateStore stateStore) {
if (stateStore instanceof TimestampedKeyValueStore) {
throw new IllegalArgumentException("Store " + stateStore.name()
- + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
+ + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
}
if (stateStore instanceof ReadOnlyKeyValueStore) {
throw new IllegalArgumentException("Store " + stateStore.name()
- + " is a key-value store and should be accessed via `getKeyValueStore()`");
+ + " is a key-value store and should be accessed via `getKeyValueStore()`");
}
if (stateStore instanceof TimestampedWindowStore) {
throw new IllegalArgumentException("Store " + stateStore.name()
- + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
+ + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
}
if (stateStore instanceof ReadOnlyWindowStore) {
throw new IllegalArgumentException("Store " + stateStore.name()
- + " is a window store and should be accessed via `getWindowStore()`");
+ + " is a window store and should be accessed via `getWindowStore()`");
}
if (stateStore instanceof ReadOnlySessionStore) {
throw new IllegalArgumentException("Store " + stateStore.name()
- + " is a session store and should be accessed via `getSessionStore()`");
+ + " is a session store and should be accessed via `getSessionStore()`");
}
}
@@ -1001,7 +1071,12 @@ public class TopologyTestDriver implements Closeable {
// ignore
}
}
- captureOutputRecords();
+ completeAllProcessableWork();
+ if (task != null && task.hasRecordsQueued()) {
+ log.warn("Found some records that cannot be processed due to the" +
+ " {} configuration during TopologyTestDriver#close().",
+ StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+ }
if (!eosEnabled) {
producer.close();
}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d7ac6b4..160c3fb 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -73,6 +75,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -145,7 +149,7 @@ public class TopologyTestDriverTest {
private final String topic;
private final Headers headers;
- Record(final ConsumerRecord consumerRecord,
+ Record(final ConsumerRecord<byte[], byte[]> consumerRecord,
final long newOffset) {
key = consumerRecord.key();
value = consumerRecord.value();
@@ -156,7 +160,7 @@ public class TopologyTestDriverTest {
}
Record(final String newTopic,
- final TestRecord consumerRecord,
+ final TestRecord<byte[], byte[]> consumerRecord,
final long newOffset) {
key = consumerRecord.key();
value = consumerRecord.value();
@@ -231,7 +235,7 @@ public class TopologyTestDriverTest {
}
}
- private final static class MockProcessor implements Processor {
+ private final static class MockProcessor implements Processor<Object, Object> {
private final Collection<Punctuation> punctuations;
private ProcessorContext context;
@@ -266,7 +270,7 @@ public class TopologyTestDriverTest {
private final List<MockProcessor> mockProcessors = new ArrayList<>();
- private final class MockProcessorSupplier implements ProcessorSupplier {
+ private final class MockProcessorSupplier implements ProcessorSupplier<Object, Object> {
private final Collection<Punctuation> punctuations;
private MockProcessorSupplier() {
@@ -278,7 +282,7 @@ public class TopologyTestDriverTest {
}
@Override
- public Processor get() {
+ public Processor<Object, Object> get() {
final MockProcessor mockProcessor = new MockProcessor(punctuations);
mockProcessors.add(mockProcessor);
return mockProcessor;
@@ -452,7 +456,7 @@ public class TopologyTestDriverTest {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
pipeRecord(SOURCE_TOPIC_1, testRecord1);
- final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+ final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
assertEquals(key1, outputRecord.key());
assertEquals(value1, outputRecord.value());
@@ -705,7 +709,7 @@ public class TopologyTestDriverTest {
pipeRecord(SOURCE_TOPIC_1, testRecord1);
- ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+ ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
assertEquals(key1, outputRecord.key());
assertEquals(value1, outputRecord.value());
assertEquals(SINK_TOPIC_1, outputRecord.topic());
@@ -1209,7 +1213,7 @@ public class TopologyTestDriverTest {
testDriver.pipeRecord(topic, new TestRecord<>(key, value, null, time),
new StringSerializer(), new LongSerializer(), null);
}
-
+
private void compareKeyValue(final TestRecord<String, Long> record, final String key, final Long value) {
assertThat(record.getKey(), equalTo(key));
assertThat(record.getValue(), equalTo(value));
@@ -1337,9 +1341,9 @@ public class TopologyTestDriverTest {
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor(
"storeProcessor",
- new ProcessorSupplier() {
+ new ProcessorSupplier<String, Long>() {
@Override
- public Processor get() {
+ public Processor<String, Long> get() {
return new Processor<String, Long>() {
private KeyValueStore<String, Long> store;
@@ -1472,7 +1476,7 @@ public class TopologyTestDriverTest {
testDriver = new TopologyTestDriver(topology, config);
pipeRecord(SOURCE_TOPIC_1, testRecord1);
- final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+ final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
assertEquals(key1, outputRecord.key());
assertEquals(value1, outputRecord.value());
assertEquals(SINK_TOPIC_1, outputRecord.topic());
@@ -1522,4 +1526,156 @@ public class TopologyTestDriverTest {
final TaskId taskId = new TaskId(0, 0);
assertTrue(new File(appDir, taskId.toString()).exists());
}
+
+ @Test
+ public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+
+ final Topology topology = new Topology();
+ topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
+ topology.addProcessor(
+ "recursiveProcessor",
+ () -> new AbstractProcessor<String, String>() {
+ @Override
+ public void process(final String key, final String value) {
+ if (!value.startsWith("recurse-")) {
+ context().forward(key, "recurse-" + value, To.child("recursiveSink"));
+ }
+ context().forward(key, value, To.child("sink"));
+ }
+ },
+ "source"
+ );
+ topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+ topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+ final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+
+ // given the topology above, we expect to see the output _first_ echo the input
+ // and _then_ print it with "recurse-" prepended.
+
+ in.pipeInput("B", "beta");
+ final List<KeyValue<String, String>> events = out.readKeyValuesToList();
+ assertThat(
+ events,
+ is(Arrays.asList(
+ new KeyValue<>("B", "beta"),
+ new KeyValue<>("B", "recurse-beta")
+ ))
+ );
+
+ }
+ }
+
+ @Test
+ public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+
+ final Topology topology = new Topology();
+ topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
+ topology.addGlobalStore(
+ Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("globule-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
+ "globuleSource",
+ new StringDeserializer(),
+ new StringDeserializer(),
+ "globule-topic",
+ "globuleProcessor",
+ () -> new Processor<String, String>() {
+ private KeyValueStore<String, String> stateStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ stateStore = (KeyValueStore<String, String>) context.getStateStore("globule-store");
+ }
+
+ @Override
+ public void process(final String key, final String value) {
+ stateStore.put(key, value);
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+ );
+ topology.addProcessor(
+ "recursiveProcessor",
+ () -> new AbstractProcessor<String, String>() {
+ @Override
+ public void process(final String key, final String value) {
+ if (!value.startsWith("recurse-")) {
+ context().forward(key, "recurse-" + value, To.child("recursiveSink"));
+ }
+ context().forward(key, value, To.child("sink"));
+ context().forward(key, value, To.child("globuleSink"));
+ }
+ },
+ "source"
+ );
+ topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+ topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+ topology.addSink("globuleSink", "globule-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+ final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("globule-topic", new StringDeserializer(), new StringDeserializer());
+
+ in.pipeInput("A", "alpha");
+
+ // expect the global store to correctly reflect the last update
+ final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("globule-store");
+ assertThat(keyValueStore, notNullValue());
+ assertThat(keyValueStore.get("A"), is("recurse-alpha"));
+
+ // and also just make sure the test really sent both events to the topic.
+ final List<KeyValue<String, String>> events = globalTopic.readKeyValuesToList();
+ assertThat(
+ events,
+ is(Arrays.asList(
+ new KeyValue<>("A", "alpha"),
+ new KeyValue<>("A", "recurse-alpha")
+ ))
+ );
+ }
+ }
+
+ @Test
+ public void shouldTerminateWhenUsingTaskIdling() {
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+
+ // This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver,
+ // so with an idle time specified, TTD can't just expect all enqueued records to be processable.
+ properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000");
+
+ final Topology topology = new Topology();
+ topology.addSource("source1", new StringDeserializer(), new StringDeserializer(), "input1");
+ topology.addSource("source2", new StringDeserializer(), new StringDeserializer(), "input2");
+ topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "source1", "source2");
+
+ try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+ final TestInputTopic<String, String> in1 = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+
+ in1.pipeInput("A", "alpha");
+
+ // Since the task has two inputs, and only one is buffered, task idling would normally prevent us from
+ // processing A, but we ignore that and process it anyway.
+ assertThat(
+ out.readKeyValuesToList(),
+ is(Collections.singletonList(
+ new KeyValue<>("A", "alpha")
+ ))
+ );
+ }
+ }
}
diff --git a/streams/test-utils/src/test/resources/log4j.properties b/streams/test-utils/src/test/resources/log4j.properties
new file mode 100644
index 0000000..be36f90
--- /dev/null
+++ b/streams/test-utils/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=INFO