You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/02/12 03:01:25 UTC

[kafka] branch revert-8065-kafka-9503-fix-ttd-processing-order created (now 90e9a83)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch revert-8065-kafka-9503-fix-ttd-processing-order
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 90e9a83  Revert "KAKFA-9503: Fix TopologyTestDriver output order (#8065)"

This branch includes the following new commits:

     new 90e9a83  Revert "KAKFA-9503: Fix TopologyTestDriver output order (#8065)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: Revert "KAKFA-9503: Fix TopologyTestDriver output order (#8065)"

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch revert-8065-kafka-9503-fix-ttd-processing-order
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 90e9a83568329e3c41c0a4922d7da64d69ce0572
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Feb 11 21:00:28 2020 -0600

    Revert "KAKFA-9503: Fix TopologyTestDriver output order (#8065)"
    
    This reverts commit 998f1520f9af2dddfec9a9ac072f8dcf9d9004fd.
---
 .../streams/processor/internals/StreamTask.java    |   6 +-
 .../apache/kafka/streams/TopologyTestDriver.java   | 231 +++++++--------------
 .../kafka/streams/TopologyTestDriverTest.java      | 196 +----------------
 .../test-utils/src/test/resources/log4j.properties |  21 --
 4 files changed, 90 insertions(+), 364 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f025b00..8cd6e5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -444,7 +444,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      * An active task is processable if its buffer contains data for all of its input
      * source topic partitions, or if it is enforced to be processable
      */
-    public boolean isProcessable(final long wallClockTime) {
+    private boolean isProcessable(final long wallClockTime) {
         if (partitionGroup.allPartitionsBuffered()) {
             idleStartTime = RecordQueue.UNKNOWN;
             return true;
@@ -907,10 +907,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    public boolean hasRecordsQueued() {
-        return numBuffered() > 0;
-    }
-
     // below are visible for testing only
     RecordCollector recordCollector() {
         return recordCollector;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 52e5aa2..9cf0e12 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -43,6 +43,11 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.KeyValueStoreFacade;
 import org.apache.kafka.streams.internals.QuietStreamsConfig;
 import org.apache.kafka.streams.internals.WindowStoreFacade;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
+import org.apache.kafka.streams.processor.internals.Task;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -57,16 +62,11 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -213,9 +213,9 @@ public class TopologyTestDriver implements Closeable {
     private final MockProducer<byte[], byte[]> producer;
 
     private final Set<String> internalTopics = new HashSet<>();
-    private final Map<String, TopicPartition> partitionsByInputTopic = new HashMap<>();
-    private final Map<String, TopicPartition> globalPartitionsByInputTopic = new HashMap<>();
-    private final Map<TopicPartition, AtomicLong> offsetsByTopicOrPatternPartition = new HashMap<>();
+    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
+    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
+    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
 
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
     private final boolean eosEnabled;
@@ -287,7 +287,6 @@ public class TopologyTestDriver implements Closeable {
                                final Properties config,
                                final long initialWallClockTimeMs) {
         final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
-        logIfTaskIdleEnabled(streamsConfig);
         mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
@@ -335,16 +334,16 @@ public class TopologyTestDriver implements Closeable {
 
         for (final String topic : processorTopology.sourceTopics()) {
             final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
-            partitionsByInputTopic.put(topic, tp);
-            offsetsByTopicOrPatternPartition.put(tp, new AtomicLong());
+            partitionsByTopic.put(topic, tp);
+            offsetsByTopicPartition.put(tp, new AtomicLong());
         }
-        consumer.assign(partitionsByInputTopic.values());
+        consumer.assign(partitionsByTopic.values());
 
         if (globalTopology != null) {
             for (final String topicName : globalTopology.sourceTopics()) {
                 final TopicPartition partition = new TopicPartition(topicName, 0);
-                globalPartitionsByInputTopic.put(topicName, partition);
-                offsetsByTopicOrPatternPartition.put(partition, new AtomicLong());
+                globalPartitionsByTopic.put(topicName, partition);
+                offsetsByTopicPartition.put(partition, new AtomicLong());
                 consumer.updatePartitions(topicName, Collections.singletonList(
                     new PartitionInfo(topicName, 0, null, null, null)));
                 consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
@@ -382,11 +381,11 @@ public class TopologyTestDriver implements Closeable {
             globalStateTask = null;
         }
 
-        if (!partitionsByInputTopic.isEmpty()) {
+        if (!partitionsByTopic.isEmpty()) {
             final LogContext logContext = new LogContext("topology-test-driver ");
             final ProcessorStateManager stateManager = new ProcessorStateManager(
                 TASK_ID,
-                new HashSet<>(partitionsByInputTopic.values()),
+                new HashSet<>(partitionsByTopic.values()),
                 Task.TaskType.ACTIVE,
                 stateDirectory,
                 processorTopology.storeToChangelogTopic(),
@@ -406,7 +405,7 @@ public class TopologyTestDriver implements Closeable {
                 taskId -> producer);
             task = new StreamTask(
                 TASK_ID,
-                new HashSet<>(partitionsByInputTopic.values()),
+                new HashSet<>(partitionsByTopic.values()),
                 processorTopology,
                 consumer,
                 streamsConfig,
@@ -430,20 +429,6 @@ public class TopologyTestDriver implements Closeable {
         eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
     }
 
-    private static void logIfTaskIdleEnabled(final StreamsConfig streamsConfig) {
-        final Long taskIdleTime = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
-        if (taskIdleTime > 0) {
-            log.info("Detected {} config in use with TopologyTestDriver (set to {}ms)." +
-                         " This means you might need to use TopologyTestDriver#advanceWallClockTime()" +
-                         " or enqueue records on all partitions to allow Steams to make progress." +
-                         " TopologyTestDriver will log a message each time it cannot process enqueued" +
-                         " records due to {}.",
-                     StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
-                     taskIdleTime,
-                     StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
-        }
-    }
-
     /**
      * Get read-only handle on global metrics registry.
      *
@@ -471,114 +456,77 @@ public class TopologyTestDriver implements Closeable {
             consumerRecord.headers());
     }
 
+    private void pipeRecord(final ProducerRecord<byte[], byte[]> record) {
+        pipeRecord(record.topic(), record.timestamp(), record.key(), record.value(), record.headers());
+    }
+
     private void pipeRecord(final String topicName,
-                            final long timestamp,
+                            final Long timestamp,
                             final byte[] key,
                             final byte[] value,
                             final Headers headers) {
-        final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(topicName);
-        final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(topicName);
 
-        if (inputTopicOrPatternPartition == null && globalInputTopicPartition == null) {
-            throw new IllegalArgumentException("Unknown topic: " + topicName);
-        }
-
-        if (inputTopicOrPatternPartition != null) {
-            enqueueTaskRecord(topicName, inputTopicOrPatternPartition, timestamp, key, value, headers);
-            completeAllProcessableWork();
-        }
-
-        if (globalInputTopicPartition != null) {
-            processGlobalRecord(globalInputTopicPartition, timestamp, key, value, headers);
+        if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
+            validateSourceTopicNameRegexPattern(topicName);
         }
-    }
-
-    private void enqueueTaskRecord(final String inputTopic,
-                                   final TopicPartition topicOrPatternPartition,
-                                   final long timestamp,
-                                   final byte[] key,
-                                   final byte[] value,
-                                   final Headers headers) {
-        task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord<>(
-            inputTopic,
-            topicOrPatternPartition.partition(),
-            offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1,
-            timestamp,
-            TimestampType.CREATE_TIME,
-            (long) ConsumerRecord.NULL_CHECKSUM,
-            key == null ? ConsumerRecord.NULL_SIZE : key.length,
-            value == null ? ConsumerRecord.NULL_SIZE : value.length,
-            key,
-            value,
-            headers)));
-    }
-
-    private void completeAllProcessableWork() {
-        // for internally triggered processing (like wall-clock punctuations),
-        // we might have buffered some records to internal topics that need to
-        // be piped back in to kick-start the processing loop. This is idempotent
-        // and therefore harmless in the case where all we've done is enqueued an
-        // input record from the user.
-        captureOutputsAndReEnqueueInternalResults();
-
-        // If the topology only has global tasks, then `task` would be null.
-        // For this method, it just means there's nothing to do.
-        if (task != null) {
-            while (task.hasRecordsQueued() && task.isProcessable(mockWallClockTime.milliseconds())) {
-                // Process the record ...
-                task.process(mockWallClockTime.milliseconds());
-                task.maybePunctuateStreamTime();
-                task.commit();
-                captureOutputsAndReEnqueueInternalResults();
-            }
-            if (task.hasRecordsQueued()) {
-                log.info("Due to the {} configuration, there are currently some records" +
-                             " that cannot be processed. Advancing wall-clock time or" +
-                             " enqueuing records on the empty topics will allow" +
-                             " Streams to process more.",
-                         StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
+        final TopicPartition topicPartition = getTopicPartition(topicName);
+        if (topicPartition != null) {
+            final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
+            task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>(
+                    topicName,
+                    topicPartition.partition(),
+                    offset,
+                    timestamp,
+                    TimestampType.CREATE_TIME,
+                    (long) ConsumerRecord.NULL_CHECKSUM,
+                    key == null ? ConsumerRecord.NULL_SIZE : key.length,
+                    value == null ? ConsumerRecord.NULL_SIZE : value.length,
+                    key,
+                    value,
+                    headers)));
+
+            // Process the record ...
+            task.process(mockWallClockTime.milliseconds());
+            task.maybePunctuateStreamTime();
+            task.commit();
+            captureOutputRecords();
+        } else {
+            final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName);
+            if (globalTopicPartition == null) {
+                throw new IllegalArgumentException("Unknown topic: " + topicName);
             }
+            final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1;
+            globalStateTask.update(new ConsumerRecord<>(
+                    globalTopicPartition.topic(),
+                    globalTopicPartition.partition(),
+                    offset,
+                    timestamp,
+                    TimestampType.CREATE_TIME,
+                    (long) ConsumerRecord.NULL_CHECKSUM,
+                    key == null ? ConsumerRecord.NULL_SIZE : key.length,
+                    value == null ? ConsumerRecord.NULL_SIZE : value.length,
+                    key,
+                    value,
+                    headers));
+            globalStateTask.flushState();
         }
     }
 
-    private void processGlobalRecord(final TopicPartition globalInputTopicPartition,
-                                     final long timestamp,
-                                     final byte[] key,
-                                     final byte[] value,
-                                     final Headers headers) {
-        globalStateTask.update(new ConsumerRecord<>(
-            globalInputTopicPartition.topic(),
-            globalInputTopicPartition.partition(),
-            offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1,
-            timestamp,
-            TimestampType.CREATE_TIME,
-            (long) ConsumerRecord.NULL_CHECKSUM,
-            key == null ? ConsumerRecord.NULL_SIZE : key.length,
-            value == null ? ConsumerRecord.NULL_SIZE : value.length,
-            key,
-            value,
-            headers));
-        globalStateTask.flushState();
-    }
 
     private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
         for (final String sourceTopicName : internalTopologyBuilder.sourceTopicNames()) {
             if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
                 throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
-                                                " cannot contain regex pattern for input record topic: " + inputRecordTopic +
-                                                " and hence cannot process the message.");
+                        " cannot contain regex pattern for input record topic: " + inputRecordTopic +
+                        " and hence cannot process the message.");
             }
         }
     }
 
-    private TopicPartition getInputTopicOrPatternPartition(final String topicName) {
-        if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
-            validateSourceTopicNameRegexPattern(topicName);
-        }
-
-        final TopicPartition topicPartition = partitionsByInputTopic.get(topicName);
+    private TopicPartition getTopicPartition(final String topicName) {
+        final TopicPartition topicPartition = partitionsByTopic.get(topicName);
         if (topicPartition == null) {
-            for (final Map.Entry<String, TopicPartition> entry : partitionsByInputTopic.entrySet()) {
+            for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) {
                 if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) {
                     return entry.getValue();
                 }
@@ -587,7 +535,7 @@ public class TopologyTestDriver implements Closeable {
         return topicPartition;
     }
 
-    private void captureOutputsAndReEnqueueInternalResults() {
+    private void captureOutputRecords() {
         // Capture all the records sent to the producer ...
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
         producer.clear();
@@ -600,27 +548,9 @@ public class TopologyTestDriver implements Closeable {
 
             // Forward back into the topology if the produced record is to an internal or a source topic ...
             final String outputTopicName = record.topic();
-
-            final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(outputTopicName);
-            final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(outputTopicName);
-
-            if (inputTopicOrPatternPartition != null) {
-                enqueueTaskRecord(
-                    outputTopicName,
-                    inputTopicOrPatternPartition,
-                    record.timestamp(),
-                    record.key(),
-                    record.value(),
-                    record.headers());
-            }
-
-            if (globalInputTopicPartition != null) {
-                processGlobalRecord(
-                    globalInputTopicPartition,
-                    record.timestamp(),
-                    record.key(),
-                    record.value(),
-                    record.headers());
+            if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
+                || globalPartitionsByTopic.containsKey(outputTopicName)) {
+                pipeRecord(record);
             }
         }
     }
@@ -667,7 +597,7 @@ public class TopologyTestDriver implements Closeable {
             task.maybePunctuateSystemTime();
             task.commit();
         }
-        completeAllProcessableWork();
+        captureOutputRecords();
     }
 
     /**
@@ -917,23 +847,23 @@ public class TopologyTestDriver implements Closeable {
     private void throwIfBuiltInStore(final StateStore stateStore) {
         if (stateStore instanceof TimestampedKeyValueStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                                                   + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
+                + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
         }
         if (stateStore instanceof ReadOnlyKeyValueStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                                                   + " is a key-value store and should be accessed via `getKeyValueStore()`");
+                + " is a key-value store and should be accessed via `getKeyValueStore()`");
         }
         if (stateStore instanceof TimestampedWindowStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                                                   + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
+                + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
         }
         if (stateStore instanceof ReadOnlyWindowStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                                                   + " is a window store and should be accessed via `getWindowStore()`");
+                + " is a window store and should be accessed via `getWindowStore()`");
         }
         if (stateStore instanceof ReadOnlySessionStore) {
             throw new IllegalArgumentException("Store " + stateStore.name()
-                                                   + " is a session store and should be accessed via `getSessionStore()`");
+                + " is a session store and should be accessed via `getSessionStore()`");
         }
     }
 
@@ -1079,12 +1009,7 @@ public class TopologyTestDriver implements Closeable {
                 // ignore
             }
         }
-        completeAllProcessableWork();
-        if (task != null && task.hasRecordsQueued()) {
-            log.warn("Found some records that cannot be processed due to the" +
-                         " {} configuration during TopologyTestDriver#close().",
-                     StreamsConfig.MAX_TASK_IDLE_MS_CONFIG);
-        }
+        captureOutputRecords();
         if (!eosEnabled) {
             producer.close();
         }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 60ab516..d7ac6b4 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -41,7 +40,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -75,8 +73,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -149,7 +145,7 @@ public class TopologyTestDriverTest {
         private final String topic;
         private final Headers headers;
 
-        Record(final ConsumerRecord<byte[], byte[]> consumerRecord,
+        Record(final ConsumerRecord consumerRecord,
                final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
@@ -160,7 +156,7 @@ public class TopologyTestDriverTest {
         }
 
         Record(final String newTopic,
-               final TestRecord<byte[], byte[]> consumerRecord,
+               final TestRecord consumerRecord,
                final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
@@ -235,7 +231,7 @@ public class TopologyTestDriverTest {
         }
     }
 
-    private final static class MockProcessor implements Processor<Object, Object> {
+    private final static class MockProcessor implements Processor {
         private final Collection<Punctuation> punctuations;
         private ProcessorContext context;
 
@@ -270,7 +266,7 @@ public class TopologyTestDriverTest {
 
     private final List<MockProcessor> mockProcessors = new ArrayList<>();
 
-    private final class MockProcessorSupplier implements ProcessorSupplier<Object, Object> {
+    private final class MockProcessorSupplier implements ProcessorSupplier {
         private final Collection<Punctuation> punctuations;
 
         private MockProcessorSupplier() {
@@ -282,7 +278,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public Processor<Object, Object> get() {
+        public Processor get() {
             final MockProcessor mockProcessor = new MockProcessor(punctuations);
             mockProcessors.add(mockProcessor);
             return mockProcessor;
@@ -456,7 +452,7 @@ public class TopologyTestDriverTest {
         testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
-        final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+        final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
 
         assertEquals(key1, outputRecord.key());
         assertEquals(value1, outputRecord.value());
@@ -709,7 +705,7 @@ public class TopologyTestDriverTest {
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
-        ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+        ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
         assertEquals(key1, outputRecord.key());
         assertEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
@@ -1213,7 +1209,7 @@ public class TopologyTestDriverTest {
         testDriver.pipeRecord(topic, new TestRecord<>(key, value, null, time),
                 new StringSerializer(), new LongSerializer(), null);
     }
-
+    
     private void compareKeyValue(final TestRecord<String, Long> record, final String key, final Long value) {
         assertThat(record.getKey(), equalTo(key));
         assertThat(record.getValue(), equalTo(value));
@@ -1341,9 +1337,9 @@ public class TopologyTestDriverTest {
         topology.addSource("sourceProcessor", "input-topic");
         topology.addProcessor(
             "storeProcessor",
-            new ProcessorSupplier<String, Long>() {
+            new ProcessorSupplier() {
                 @Override
-                public Processor<String, Long> get() {
+                public Processor get() {
                     return new Processor<String, Long>() {
                         private KeyValueStore<String, Long> store;
 
@@ -1476,7 +1472,7 @@ public class TopologyTestDriverTest {
         testDriver = new TopologyTestDriver(topology, config);
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
-        final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
+        final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1);
         assertEquals(key1, outputRecord.key());
         assertEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
@@ -1526,174 +1522,4 @@ public class TopologyTestDriverTest {
         final TaskId taskId = new TaskId(0, 0);
         assertTrue(new File(appDir, taskId.toString()).exists());
     }
-
-    @Test
-    public void shouldEnqueueLaterOutputsAfterEarlierOnes() {
-        final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
-        final Topology topology = new Topology();
-        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
-        topology.addProcessor(
-            "recursiveProcessor",
-            () -> new AbstractProcessor<String, String>() {
-                @Override
-                public void process(final String key, final String value) {
-                    if (!value.startsWith("recurse-")) {
-                        context().forward(key, "recurse-" + value, To.child("recursiveSink"));
-                    }
-                    context().forward(key, value, To.child("sink"));
-                }
-            },
-            "source"
-        );
-        topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
-        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
-
-        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
-            final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
-            final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
-
-            // given the topology above, we expect to see the output _first_ echo the input
-            // and _then_ print it with "recurse-" prepended.
-
-            in.pipeInput("B", "beta");
-            final List<KeyValue<String, String>> events = out.readKeyValuesToList();
-            assertThat(
-                events,
-                is(Arrays.asList(
-                    new KeyValue<>("B", "beta"),
-                    new KeyValue<>("B", "recurse-beta")
-                ))
-            );
-
-        }
-    }
-
-    @Test
-    public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() {
-        final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
-        final Topology topology = new Topology();
-        topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input");
-        topology.addGlobalStore(
-            Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("globule-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(),
-            "globuleSource",
-            new StringDeserializer(),
-            new StringDeserializer(),
-            "globule-topic",
-            "globuleProcessor",
-            () -> new Processor<String, String>() {
-                private KeyValueStore<String, String> stateStore;
-
-                @SuppressWarnings("unchecked")
-                @Override
-                public void init(final ProcessorContext context) {
-                    stateStore = (KeyValueStore<String, String>) context.getStateStore("globule-store");
-                }
-
-                @Override
-                public void process(final String key, final String value) {
-                    stateStore.put(key, value);
-                }
-
-                @Override
-                public void close() {
-
-                }
-            }
-        );
-        topology.addProcessor(
-            "recursiveProcessor",
-            () -> new AbstractProcessor<String, String>() {
-                @Override
-                public void process(final String key, final String value) {
-                    if (!value.startsWith("recurse-")) {
-                        context().forward(key, "recurse-" + value, To.child("recursiveSink"));
-                    }
-                    context().forward(key, value, To.child("sink"));
-                    context().forward(key, value, To.child("globuleSink"));
-                }
-            },
-            "source"
-        );
-        topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
-        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
-        topology.addSink("globuleSink", "globule-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor");
-
-        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
-            final TestInputTopic<String, String> in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer());
-            final TestOutputTopic<String, String> globalTopic = topologyTestDriver.createOutputTopic("globule-topic", new StringDeserializer(), new StringDeserializer());
-
-            in.pipeInput("A", "alpha");
-
-            // expect the global store to correctly reflect the last update
-            final KeyValueStore<String, String> keyValueStore = topologyTestDriver.getKeyValueStore("globule-store");
-            assertThat(keyValueStore, notNullValue());
-            assertThat(keyValueStore.get("A"), is("recurse-alpha"));
-
-            // and also just make sure the test really sent both events to the topic.
-            final List<KeyValue<String, String>> events = globalTopic.readKeyValuesToList();
-            assertThat(
-                events,
-                is(Arrays.asList(
-                    new KeyValue<>("A", "alpha"),
-                    new KeyValue<>("A", "recurse-alpha")
-                ))
-            );
-        }
-    }
-
-    @Test
-    public void shouldRespectTaskIdling() {
-        final Properties properties = new Properties();
-        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
-        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-
-        // This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver,
-        // so with an idle time specified, TTD can't just expect all enqueued records to be processable.
-        properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000");
-
-        final Topology topology = new Topology();
-        topology.addSource("source1", new StringDeserializer(), new StringDeserializer(), "input1");
-        topology.addSource("source2", new StringDeserializer(), new StringDeserializer(), "input2");
-        topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "source1", "source2");
-
-        try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) {
-            final TestInputTopic<String, String> in1 = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> in2 = topologyTestDriver.createInputTopic("input2", new StringSerializer(), new StringSerializer());
-            final TestOutputTopic<String, String> out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
-
-            in1.pipeInput("A", "alpha");
-            topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1));
-
-            // only one input has records, and it's only been one ms
-            assertThat(out.readKeyValuesToList(), is(Collections.emptyList()));
-
-            in2.pipeInput("B", "beta");
-
-            // because both topics have records, we can process (even though it's only been one ms)
-            // but after processing A (the earlier record), we now only have one input queued, so
-            // task idling takes effect again
-            assertThat(
-                out.readKeyValuesToList(),
-                is(Collections.singletonList(
-                    new KeyValue<>("A", "alpha")
-                ))
-            );
-
-            topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(1));
-
-            // now that one second has elapsed, the idle time has expired, and we can process B
-            assertThat(
-                out.readKeyValuesToList(),
-                is(Collections.singletonList(
-                    new KeyValue<>("B", "beta")
-                ))
-            );
-        }
-    }
 }
diff --git a/streams/test-utils/src/test/resources/log4j.properties b/streams/test-utils/src/test/resources/log4j.properties
deleted file mode 100644
index be36f90..0000000
--- a/streams/test-utils/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-
-log4j.logger.org.apache.kafka=INFO