You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/12 03:49:05 UTC

[kafka] branch 2.5 updated: KAKFA-9503: Fix TopologyTestDriver output order (#8065)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 7b71cb9  KAKFA-9503: Fix TopologyTestDriver output order (#8065)
7b71cb9 is described below

commit 7b71cb92b539a547a99dc6dd094f475fd55a8572
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Feb 11 21:00:17 2020 -0600

    KAKFA-9503: Fix TopologyTestDriver output order (#8065)
    
    Migrates TopologyTestDriver processing to be closer to the same processing/ordering
    semantics as KafkaStreams. This corrects the output order for recursive topologies
    as reported in KAFKA-9503, and also works similarly in the case of task idling.
    
    Cherry-pick of 998f1520f9af2dddfec9a9ac072f8dcf9d9004fd from trunk
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    |   6 +-
 .../apache/kafka/streams/TopologyTestDriver.java   | 223 ++++++++++++++-------
 .../kafka/streams/TopologyTestDriverTest.java      | 196 +++++++++++++++++-
 .../test-utils/src/test/resources/log4j.properties |  21 ++
 4 files changed, 361 insertions(+), 85 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5a7790a..ae06bc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -335,7 +335,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * An active task is processable if its buffer contains data for all of its input
      * source topic partitions, or if it is enforced to be processable
      */
-    boolean isProcessable(final long now) {
+    public boolean isProcessable(final long now) {
         if (partitionGroup.allPartitionsBuffered()) {
             idleStartTime = RecordQueue.UNKNOWN;
             return true;
@@ -981,4 +981,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     public Map<TopicPartition, Long> restoredOffsets() {
         return stateMgr.changelogReader().restoredOffsets();
     }
+
+    public boolean hasRecordsQueued() {
+        return numBuffered() > 0;
+    }
 }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index cffffbe..442b7b9 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -209,9 +209,9 @@ public class TopologyTestDriver implements Closeable {
     private final MockProducer<byte[], byte[]> producer;
 
     private final Set<String> internalTopics = new HashSet<>();
-    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
-    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
-    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
+    private final Map<String, TopicPartition> partitionsByInputTopic = new HashMap<>();
+    private final Map<String, TopicPartition> globalPartitionsByInputTopic = new HashMap<>();
+    private final Map<TopicPartition, AtomicLong> offsetsByTopicOrPatternPartition = new HashMap<>();
 
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
     private final boolean eosEnabled;
@@ -272,6 +272,7 @@ public class TopologyTestDriver implements Closeable {
                                final Properties config,
                                final long initialWallClockTimeMs) {
         final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
+        logIfTaskIdleEnabled(streamsConfig);
         mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
@@ -329,12 +330,12 @@ public class TopologyTestDriver implements Closeable {
 
         for (final String topic : processorTopology.sourceTopics()) {
             final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
-            partitionsByTopic.put(topic, tp);
-            offsetsByTopicPartition.put(tp, new AtomicLong());
+            partitionsByInputTopic.put(topic, tp);
+            offsetsByTopicOrPatternPartition.put(tp, new AtomicLong());
         }
-        consumer.assign(partitionsByTopic.values());
+        consumer.assign(partitionsByInputTopic.values());
         final Map<TopicPartition, Long> startOffsets = new HashMap<>();
-        for (final TopicPartition topicPartition : partitionsByTopic.values()) {
+        for (final TopicPartition topicPartition : partitionsByInputTopic.values()) {
             startOffsets.put(topicPartition, 0L);
         }
         consumer.updateBeginningOffsets(startOffsets);
@@ -343,8 +344,8 @@ public class TopologyTestDriver implements Closeable {
             final MockConsumer<byte[], byte[]> globalConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
             for (final String topicName : globalTopology.sourceTopics()) {
                 final TopicPartition partition = new TopicPartition(topicName, 0);
-                globalPartitionsByTopic.put(topicName, partition);
-                offsetsByTopicPartition.put(partition, new AtomicLong());
+                globalPartitionsByInputTopic.put(topicName, partition);
+                offsetsByTopicOrPatternPartition.put(partition, new AtomicLong());
                 globalConsumer.updatePartitions(topicName, Collections.singletonList(
                     new PartitionInfo(topicName, 0, null, null, null)));
                 globalConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
@@ -382,10 +383,10 @@ public class TopologyTestDriver implements Closeable {
             globalStateTask = null;
         }
 
-        if (!partitionsByTopic.isEmpty()) {
+        if (!partitionsByInputTopic.isEmpty()) {
             task = new StreamTask(
                 TASK_ID,
-                new HashSet<>(partitionsByTopic.values()),
+                new HashSet<>(partitionsByInputTopic.values()),
                 processorTopology,
                 consumer,
                 new StoreChangelogReader(
@@ -413,6 +414,20 @@ public class TopologyTestDriver implements Closeable {
         eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
     }
 
+    private static void logIfTaskIdleEnabled(final StreamsConfig streamsConfig) {
+        final Long taskIdleTime = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+        if (taskIdleTime > 0) {
+            log.info("Detected {} config in use with TopologyTestDriver (set to {}ms)." +
+                         " This means you might need to use TopologyTestDriver#advanceWallClockTime()" +
+                         " or enqueue records on all partitions to allow Steams to make progress." +
+                         " TopologyTestDriver will log a message each time it cannot process enqueued" +
+                         " records due to {}.",
+                     StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
+                     taskIdleTime,
+                     StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+        }
+    }
+
     /**
      * Get read-only handle on global metrics registry.
      *
@@ -440,77 +455,116 @@ public class TopologyTestDriver implements Closeable {
             consumerRecord.headers());
     }
 
-    private void pipeRecord(final ProducerRecord<byte[], byte[]> record) {
-        pipeRecord(record.topic(), record.timestamp(), record.key(), record.value(), record.headers());
-    }
-
     private void pipeRecord(final String topicName,
-                            final Long timestamp,
+                            final long timestamp,
                             final byte[] key,
                             final byte[] value,
                             final Headers headers) {
+        final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(topicName);
+        final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(topicName);
 
-        if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
-            validateSourceTopicNameRegexPattern(topicName);
+        if (inputTopicOrPatternPartition == null && globalInputTopicPartition == null) {
+            throw new IllegalArgumentException("Unknown topic: " + topicName);
         }
-        final TopicPartition topicPartition = getTopicPartition(topicName);
-        if (topicPartition != null) {
-            final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
-            task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
-                    topicName,
-                    topicPartition.partition(),
-                    offset,
-                    timestamp,
-                    TimestampType.CREATE_TIME,
-                    (long) ConsumerRecord.NULL_CHECKSUM,
-                    key == null ? ConsumerRecord.NULL_SIZE : key.length,
-                    value == null ? ConsumerRecord.NULL_SIZE : value.length,
-                    key,
-                    value,
-                    headers)));
-
-            // Process the record ...
-            task.process();
-            task.maybePunctuateStreamTime();
-            task.commit();
-            captureOutputRecords();
-        } else {
-            final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
-            if (globalTopicPartition == null) {
-                throw new IllegalArgumentException("Unknown topic: " + topicName);
+
+        if (inputTopicOrPatternPartition != null) {
+            enqueueTaskRecord(topicName, inputTopicOrPatternPartition, timestamp, key, value, headers);
+            completeAllProcessableWork();
+        }
+
+        if (globalInputTopicPartition != null) {
+            processGlobalRecord(globalInputTopicPartition, timestamp, key, value, headers);
+        }
+    }
+
+    private void enqueueTaskRecord(final String inputTopic,
+                                   final TopicPartition topicOrPatternPartition,
+                                   final long timestamp,
+                                   final byte[] key,
+                                   final byte[] value,
+                                   final Headers headers) {
+        task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord<>(
+            inputTopic,
+            topicOrPatternPartition.partition(),
+            offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1,
+            timestamp,
+            TimestampType.CREATE_TIME,
+            (long) ConsumerRecord.NULL_CHECKSUM,
+            key == null ? ConsumerRecord.NULL_SIZE : key.length,
+            value == null ? ConsumerRecord.NULL_SIZE : value.length,
+            key,
+            value,
+            headers)));
+    }
+
+    private void completeAllProcessableWork() {
+        // for internally triggered processing (like wall-clock punctuations),
+        // we might have buffered some records to internal topics that need to
+        // be piped back in to kick-start the processing loop. This is idempotent
+        // and therefore harmless in the case where all we've done is enqueued an
+        // input record from the user.
+        captureOutputsAndReEnqueueInternalResults();
+
+        // If the topology only has global tasks, then `task` would be null.
+        // For this method, it just means there's nothing to do.
+        if (task != null) {
+            while (task.hasRecordsQueued() && task.isProcessable(mockWallClockTime.milliseconds())) {
+                // Process the record ...
+                if (task.isProcessable(mockWallClockTime.milliseconds())) {
+                    task.process();
+                }
+                task.maybePunctuateStreamTime();
+                task.commit();
+                captureOutputsAndReEnqueueInternalResults();
+            }
+            if (task.hasRecordsQueued()) {
+                log.info("Due to the {} configuration, there are currently some records" +
+                             " that cannot be processed. Advancing wall-clock time or" +
+                             " enqueuing records on the empty topics will allow" +
+                             " Streams to process more.",
+                         StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
             }
-            final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1;
-            globalStateTask.update(new ConsumerRecord<>(
-                    globalTopicPartition.topic(),
-                    globalTopicPartition.partition(),
-                    offset,
-                    timestamp,
-                    TimestampType.CREATE_TIME,
-                    (long) ConsumerRecord.NULL_CHECKSUM,
-                    key == null ? ConsumerRecord.NULL_SIZE : key.length,
-                    value == null ? ConsumerRecord.NULL_SIZE : value.length,
-                    key,
-                    value,
-                    headers));
-            globalStateTask.flushState();
         }
     }
 
+    private void processGlobalRecord(final TopicPartition globalInputTopicPartition,
+                                     final long timestamp,
+                                     final byte[] key,
+                                     final byte[] value,
+                                     final Headers headers) {
+        globalStateTask.update(new ConsumerRecord<>(
+            globalInputTopicPartition.topic(),
+            globalInputTopicPartition.partition(),
+            offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1,
+            timestamp,
+            TimestampType.CREATE_TIME,
+            (long) ConsumerRecord.NULL_CHECKSUM,
+            key == null ? ConsumerRecord.NULL_SIZE : key.length,
+            value == null ? ConsumerRecord.NULL_SIZE : value.length,
+            key,
+            value,
+            headers));
+        globalStateTask.flushState();
+    }
 
     private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
         for (final String sourceTopicName : internalTopologyBuilder.sourceTopicNames()) {
             if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
                 throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
-                        " cannot contain regex pattern for input record topic: " + inputRecordTopic +
-                        " and hence cannot process the message.");
+                                                " cannot contain regex pattern for input record topic: " + inputRecordTopic +
+                                                " and hence cannot process the message.");
             }
         }
     }
 
-    private TopicPartition getTopicPartition(final String topicName) {
-        final TopicPartition topicPartition = partitionsByTopic.get(topicName);
+    private TopicPartition getInputTopicOrPatternPartition(final String topicName) {
+        if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
+            validateSourceTopicNameRegexPattern(topicName);
+        }
+
+        final TopicPartition topicPartition = partitionsByInputTopic.get(topicName);
         if (topicPartition == null) {
-            for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) {
+            for (final Map.Entry<String, TopicPartition> entry : partitionsByInputTopic.entrySet()) {
                 if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) {
                     return entry.getValue();
                 }
@@ -519,7 +573,7 @@ public class TopologyTestDriver implements Closeable {
         return topicPartition;
     }
 
-    private void captureOutputRecords() {
+    private void captureOutputsAndReEnqueueInternalResults() {
         // Capture all the records sent to the producer ...
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
         producer.clear();
@@ -532,9 +586,27 @@ public class TopologyTestDriver implements Closeable {
 
             // Forward back into the topology if the produced record is to an internal or a source topic ...
             final String outputTopicName = record.topic();
-            if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
-                || globalPartitionsByTopic.containsKey(outputTopicName)) {
-                pipeRecord(record);
+
+            final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(outputTopicName);
+            final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(outputTopicName);
+
+            if (inputTopicOrPatternPartition != null) {
+                enqueueTaskRecord(
+                    outputTopicName,
+                    inputTopicOrPatternPartition,
+                    record.timestamp(),
+                    record.key(),
+                    record.value(),
+                    record.headers());
+            }
+
+            if (globalInputTopicPartition != null) {
+                processGlobalRecord(
+                    globalInputTopicPartition,
+                    record.timestamp(),
+                    record.key(),
+                    record.value(),
+                    record.headers());
             }
         }
     }
@@ -581,7 +653,7 @@ public class TopologyTestDriver implements Closeable {
             task.maybePunctuateSystemTime();
             task.commit();
         }
-        captureOutputRecords();
+        completeAllProcessableWork();
     }
 
     /**
@@ -831,23 +903,23 @@ public class TopologyTestDriver implements Closeable {
     private void throwIfBuiltInStore(final StateStore stateStore) {
         if (stateStore instanceof TimestampedKeyValueStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
+                                                   + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
         }
         if (stateStore instanceof ReadOnlyKeyValueStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                + " is a key-value store and should be accessed via `getKeyValueStore()`");
+                                                   + " is a key-value store and should be accessed via `getKeyValueStore()`");
         }
         if (stateStore instanceof TimestampedWindowStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
+                                                   + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
         }
         if (stateStore instanceof ReadOnlyWindowStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                + " is a window store and should be accessed via `getWindowStore()`");
+                                                   + " is a window store and should be accessed via `getWindowStore()`");
         }
         if (stateStore instanceof ReadOnlySessionStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                + " is a session store and should be accessed via `getSessionStore()`");
+                                                   + " is a session store and should be accessed via `getSessionStore()`");
         }
     }
 
@@ -993,7 +1065,12 @@ public class TopologyTestDriver implements Closeable {
                 // ignore
             }
         }
-        captureOutputRecords();
+        completeAllProcessableWork();
+        if (task != null && task.hasRecordsQueued()) {
+            log.warn("Found some records that cannot be processed due to the" +
+                         " {} configuration during TopologyTestDriver#close().",
+                     StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+        }
         if (!eosEnabled) {
             producer.close();
         }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d7ac6b4..60ab516 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -73,6 +75,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -145,7 +149,7 @@ public class TopologyTestDriverTest {
         private final String topic;
         private final Headers headers;
 
-        Record(final ConsumerRecord consumerRecord,
+        Record(final ConsumerRecord<byte[], byte[]> consumerRecord,
                final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
@@ -156,7 +160,7 @@ public class TopologyTestDriverTest {
         }
 
         Record(final String newTopic,
-               final TestRecord consumerRecord,
+               final TestRecord<byte[], byte[]> consumerRecord,
                final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
@@ -231,7 +235,7 @@ public class TopologyTestDriverTest {
         }
     }
 
-    private final static class MockProcessor implements Processor {
+    private final static class MockProcessor implements Processor<Object, Object> {
         private final Collection<Punctuation> punctuations;
         private ProcessorContext context;
 
@@ -266,7 +270,7 @@ public class TopologyTestDriverTest {
 
     private final List<MockProcessor> mockProcessors = new ArrayList<>();
 
-    private final class MockProcessorSupplier implements ProcessorSupplier {
+    private final class MockProcessorSupplier implements ProcessorSupplier<Object, Object> {
         private final Collection<Punctuation> punctuations;
 
         private MockProcessorSupplier() {
@@ -278,7 +282,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public Processor get() {
+        public Processor<Object, Object> get() {
             final MockProcessor mockProcessor = new MockProcessor(punctuations);
             mockProcessors.add(mockProcessor);
             return mockProcessor;
@@ -452,7 +456,7 @@ public class TopologyTestDriverTest {
         testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
-        final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+        final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
 
         assertEquals(key1, outputRecord.key());
         assertEquals(value1, outputRecord.value());
@@ -705,7 +709,7 @@ public class TopologyTestDriverTest {
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
-        ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+        ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
         assertEquals(key1, outputRecord.key());
         assertEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
@@ -1209,7 +1213,7 @@ public class TopologyTestDriverTest {
         testDriver.pipeRecord(topic, new TestRecord<>(key, value, null, time),
                 new StringSerializer(), new LongSerializer(), null);
     }
-    
+
     private void compareKeyValue(final TestRecord<String, Long> record, final String key, final Long value) {
         assertThat(record.getKey(), equalTo(key));
         assertThat(record.getValue(), equalTo(value));
@@ -1337,9 +1341,9 @@ public class TopologyTestDriverTest {
         topology.addSource("sourceProcessor", "input-topic");
         topology.addProcessor(
             "storeProcessor",
-            new ProcessorSupplier() {
+            new ProcessorSupplier<String, Long>() {
                 @Override
-                public Processor get() {
+                public Processor<String, Long> get() {
                     return new Processor<String, Long>() {
                         private KeyValueStore<String, Long> store;
 
@@ -1472,7 +1476,7 @@ public class TopologyTestDriverTest {
         testDriver = new TopologyTestDriver(topology, config);
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
-        final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+        final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
         assertEquals(key1, outputRecord.key());
         assertEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
@@ -1522,4 +1526,174 @@ public class TopologyTestDriverTest {
         final TaskId taskId = new TaskId(0, 0);
         assertTrue(new File(appDir, taskId.toString()).exists());
     }
+
+    @Test
+    public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+
+        final Topology topology = new Topology();
+        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
+        topology.addProcessor(
+            "recursiveProcessor",
+            () -> new AbstractProcessor<String, String>() {
+                @Override
+                public void process(final String key, final String value) {
+                    if (!value.startsWith("recurse-")) {
+                        context().forward(key, "recurse-" + value, To.child("recursiveSink"));
+                    }
+                    context().forward(key, value, To.child("sink"));
+                }
+            },
+            "source"
+        );
+        topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+
+        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+            final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+
+            // given the topology above, we expect to see the output _first_ echo the input
+            // and _then_ print it with "recurse-" prepended.
+
+            in.pipeInput("B", "beta");
+            final List<KeyValue<String, String>> events = out.readKeyValuesToList();
+            assertThat(
+                events,
+                is(Arrays.asList(
+                    new KeyValue<>("B", "beta"),
+                    new KeyValue<>("B", "recurse-beta")
+                ))
+            );
+
+        }
+    }
+
+    @Test
+    public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+
+        final Topology topology = new Topology();
+        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
+        topology.addGlobalStore(
+            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("globule-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
+            "globuleSource",
+            new StringDeserializer(),
+            new StringDeserializer(),
+            "globule-topic",
+            "globuleProcessor",
+            () -> new Processor<String, String>() {
+                private KeyValueStore<String, String> stateStore;
+
+                @SuppressWarnings("unchecked")
+                @Override
+                public void init(final ProcessorContext context) {
+                    stateStore = (KeyValueStore<String, String>) context.getStateStore("globule-store");
+                }
+
+                @Override
+                public void process(final String key, final String value) {
+                    stateStore.put(key, value);
+                }
+
+                @Override
+                public void close() {
+
+                }
+            }
+        );
+        topology.addProcessor(
+            "recursiveProcessor",
+            () -> new AbstractProcessor<String, String>() {
+                @Override
+                public void process(final String key, final String value) {
+                    if (!value.startsWith("recurse-")) {
+                        context().forward(key, "recurse-" + value, To.child("recursiveSink"));
+                    }
+                    context().forward(key, value, To.child("sink"));
+                    context().forward(key, value, To.child("globuleSink"));
+                }
+            },
+            "source"
+        );
+        topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+        topology.addSink("globuleSink", "globule-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
+
+        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+            final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("globule-topic", new StringDeserializer(), new StringDeserializer());
+
+            in.pipeInput("A", "alpha");
+
+            // expect the global store to correctly reflect the last update
+            final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("globule-store");
+            assertThat(keyValueStore, notNullValue());
+            assertThat(keyValueStore.get("A"), is("recurse-alpha"));
+
+            // and also just make sure the test really sent both events to the topic.
+            final List<KeyValue<String, String>> events = globalTopic.readKeyValuesToList();
+            assertThat(
+                events,
+                is(Arrays.asList(
+                    new KeyValue<>("A", "alpha"),
+                    new KeyValue<>("A", "recurse-alpha")
+                ))
+            );
+        }
+    }
+
+    @Test
+    public void shouldRespectTaskIdling() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
+
+        // This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver,
+        // so with an idle time specified, TTD can't just expect all enqueued records to be processable.
+        properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000");
+
+        final Topology topology = new Topology();
+        topology.addSource("source1", new StringDeserializer(), new StringDeserializer(), "input1");
+        topology.addSource("source2", new StringDeserializer(), new StringDeserializer(), "input2");
+        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "source1", "source2");
+
+        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
+            final TestInputTopic<String, String> in1 = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> in2 = topologyTestDriver.createInputTopic("input2", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
+
+            in1.pipeInput("A", "alpha");
+            topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1));
+
+            // only one input has records, and it's only been one ms
+            assertThat(out.readKeyValuesToList(), is(Collections.emptyList()));
+
+            in2.pipeInput("B", "beta");
+
+            // because both topics have records, we can process (even though it's only been one ms)
+            // but after processing A (the earlier record), we now only have one input queued, so
+            // task idling takes effect again
+            assertThat(
+                out.readKeyValuesToList(),
+                is(Collections.singletonList(
+                    new KeyValue<>("A", "alpha")
+                ))
+            );
+
+            topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(1));
+
+            // now that one second has elapsed, the idle time has expired, and we can process B
+            assertThat(
+                out.readKeyValuesToList(),
+                is(Collections.singletonList(
+                    new KeyValue<>("B", "beta")
+                ))
+            );
+        }
+    }
 }
diff --git a/streams/test-utils/src/test/resources/log4j.properties b/streams/test-utils/src/test/resources/log4j.properties
new file mode 100644
index 0000000..be36f90
--- /dev/null
+++ b/streams/test-utils/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=INFO