You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/04 15:42:06 UTC

[kafka] branch trunk updated: MINOR: Removed deprecated schedule function (#4908)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af98326  MINOR: Removed deprecated schedule function (#4908)
af98326 is described below

commit af983267be7a2d0f81527f5a348af377f30caee4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri May 4 08:42:01 2018 -0700

    MINOR: Removed deprecated schedule function (#4908)
    
    While working on this, I also refactored the MockProcessor out of the MockProcessorSupplier to cleanup the unit test paths.
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../examples/wordcount/WordCountProcessorDemo.java |   4 -
 .../kafka/streams/kstream/KGroupedStream.java      |   4 +-
 .../kafka/streams/kstream/TimeWindowedKStream.java |   4 +-
 .../apache/kafka/streams/kstream/Transformer.java  |  21 ----
 .../kstream/internals/KStreamTransform.java        |   9 --
 .../kstream/internals/KStreamTransformValues.java  |  14 ---
 .../kafka/streams/processor/AbstractProcessor.java |  17 +--
 .../apache/kafka/streams/processor/Processor.java  |  11 --
 .../kafka/streams/processor/ProcessorContext.java  |  12 ---
 .../internals/GlobalProcessorContextImpl.java      |  10 --
 .../processor/internals/ProcessorContextImpl.java  |  11 --
 .../streams/processor/internals/ProcessorNode.java |   4 +-
 .../processor/internals/StandbyContextImpl.java    |   9 --
 .../apache/kafka/streams/StreamsBuilderTest.java   |   8 +-
 .../org/apache/kafka/streams/TopologyTest.java     |   4 -
 .../streams/integration/EosIntegrationTest.java    |   5 -
 .../integration/RestoreIntegrationTest.java        |   9 +-
 .../kafka/streams/kstream/KStreamBuilderTest.java  |   8 +-
 .../kstream/internals/AbstractStreamTest.java      |   6 +-
 .../kstream/internals/KStreamBranchTest.java       |  16 +--
 .../kstream/internals/KStreamFilterTest.java       |  16 +--
 .../kstream/internals/KStreamFlatMapTest.java      |  10 +-
 .../internals/KStreamFlatMapValuesTest.java        |  12 +--
 .../internals/KStreamGlobalKTableJoinTest.java     |  12 ++-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  16 +--
 .../streams/kstream/internals/KStreamImplTest.java |  27 +++--
 .../kstream/internals/KStreamKStreamJoinTest.java  |  40 +++----
 .../internals/KStreamKStreamLeftJoinTest.java      |  17 +--
 .../kstream/internals/KStreamKTableJoinTest.java   |  13 ++-
 .../internals/KStreamKTableLeftJoinTest.java       |  10 +-
 .../streams/kstream/internals/KStreamMapTest.java  |  10 +-
 .../kstream/internals/KStreamMapValuesTest.java    |  13 +--
 .../kstream/internals/KStreamSelectKeyTest.java    |   8 +-
 .../kstream/internals/KStreamTransformTest.java    | 120 ++++++++++-----------
 .../internals/KStreamTransformValuesTest.java      |  19 ++--
 .../internals/KStreamWindowAggregateTest.java      |  44 ++++----
 .../kstream/internals/KTableAggregateTest.java     |  46 ++++----
 .../kstream/internals/KTableFilterTest.java        |  81 +++++++-------
 .../streams/kstream/internals/KTableImplTest.java  |  24 ++---
 .../internals/KTableKTableInnerJoinTest.java       |  41 +++----
 .../internals/KTableKTableLeftJoinTest.java        |  24 +++--
 .../internals/KTableKTableOuterJoinTest.java       |  25 +++--
 .../kstream/internals/KTableMapKeysTest.java       |   8 +-
 .../kstream/internals/KTableMapValuesTest.java     |  31 +++---
 .../kstream/internals/KTableSourceTest.java        |  20 ++--
 .../streams/processor/TopologyBuilderTest.java     |   4 -
 .../internals/AbstractProcessorContextTest.java    |   3 -
 .../processor/internals/GlobalStateTaskTest.java   |   4 +-
 .../internals/InternalTopologyBuilderTest.java     |   3 -
 .../processor/internals/ProcessorNodeTest.java     |  10 --
 .../processor/internals/ProcessorTopologyTest.java |  37 +------
 .../processor/internals/PunctuationQueueTest.java  |  96 ++++++-----------
 .../processor/internals/StreamTaskTest.java        |  18 ++--
 .../processor/internals/StreamThreadTest.java      |   4 -
 .../kafka/test/InternalMockProcessorContext.java   |   3 -
 .../org/apache/kafka/test/KStreamTestDriver.java   |  15 ---
 ...ckProcessorSupplier.java => MockProcessor.java} |  87 ++++++---------
 .../org/apache/kafka/test/MockProcessorNode.java   |  23 ++--
 .../apache/kafka/test/MockProcessorSupplier.java   |  85 +++------------
 .../apache/kafka/test/NoOpProcessorContext.java    |   4 -
 .../streams/scala/kstream/KGroupedStream.scala     |   1 -
 .../kafka/streams/scala/kstream/KStream.scala      |   8 --
 .../scala/kstream/TimeWindowedKStream.scala        |   1 -
 .../streams/processor/MockProcessorContext.java    |   8 --
 .../kafka/streams/MockProcessorContextTest.java    |   4 -
 .../kafka/streams/TopologyTestDriverTest.java      |  14 ---
 66 files changed, 490 insertions(+), 815 deletions(-)

diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index dbf2b70..523bb46 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -98,10 +98,6 @@ public class WordCountProcessorDemo {
                 }
 
                 @Override
-                @Deprecated
-                public void punctuate(long timestamp) {}
-
-                @Override
                 public void close() {}
             };
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 29de64c..d8589e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -177,7 +176,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
-     *                      Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided
+     *                      Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()}
+     *                      if there is no valueSerde provided
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 8ef0bd7..7f9752b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -101,7 +100,8 @@ public interface TimeWindowedKStream<K, V> {
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      ** @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
-     *                      Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided
+     *                       Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()}
+     *                       if there is no valueSerde provided
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index a83b4a3..bbf8c25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
 
 /**
@@ -82,26 +81,6 @@ public interface Transformer<K, V, R> {
     R transform(final K key, final V value);
 
     /**
-     * Perform any periodic operations and possibly generate new {@link KeyValue} pairs if this processor
-     * {@link ProcessorContext#schedule(long) schedules itself} with the context during
-     * {@link #init(ProcessorContext) initialization}.
-     * <p>
-     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
-     * {@link ProcessorContext#forward(Object, Object, To)} can be used.
-     * <p>
-     * Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to
-     * timestamps return by the used {@link TimestampExtractor})
-     * and not based on wall-clock time.
-     *
-     * @deprecated Please use {@link Punctuator} functional interface instead.
-     *
-     * @param timestamp the stream time when {@code punctuate} is being called
-     * @return new {@link KeyValue} pair to be forwarded to down stream&mdash;if {@code null} will not be forwarded
-     */
-    @Deprecated
-    R punctuate(final long timestamp);
-
-    /**
      * Close this processor and clean up any resources.
      * <p>
      * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 0afadbb..1ae8ede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -59,15 +59,6 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
                 context().forward(pair.key, pair.value);
         }
 
-        @SuppressWarnings("deprecation")
-        @Override
-        public void punctuate(long timestamp) {
-            KeyValue<? extends K2, ? extends V2> pair = transformer.punctuate(timestamp);
-
-            if (pair != null)
-                context().forward(pair.key, pair.value);
-        }
-
         @Override
         public void close() {
             transformer.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index e644597..d09fae2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -106,12 +106,6 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                         return context.schedule(interval, type, callback);
                     }
 
-                    @SuppressWarnings("deprecation")
-                    @Override
-                    public void schedule(final long interval) {
-                        context.schedule(interval);
-                    }
-
                     @Override
                     public <K, V> void forward(final K key, final V value) {
                         throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
@@ -177,14 +171,6 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
             context.forward(key, valueTransformer.transform(key, value));
         }
 
-        @SuppressWarnings("deprecation")
-        @Override
-        public void punctuate(long timestamp) {
-            if (valueTransformer.punctuate(timestamp) != null) {
-                throw new StreamsException("ValueTransformer#punctuate must return null.");
-            }
-        }
-
         @Override
         public void close() {
             valueTransformer.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 14e6c2a..83abfca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor;
 
 /**
  * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
- * implementations of {@link #punctuate(long)} and {@link #close()}.
+ * implementation of {@link #close()}.
  *
  * @param <K> the type of keys
  * @param <V> the type of values
@@ -36,21 +36,6 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
     }
 
     /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
-     * during {@link #init(ProcessorContext) initialization}.
-     * <p>
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     * </p>
-     *
-     * @param timestamp the wallclock time when this method is being called
-     */
-    @SuppressWarnings("deprecation")
-    @Override
-    public void punctuate(final long timestamp) {
-        // do nothing
-    }
-
-    /**
      * Close this processor and clean up any resources.
      * <p>
      * This method does nothing by default; if desired, subclasses should override it with custom functionality.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index 2ed17df..bcdb2f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -48,17 +48,6 @@ public interface Processor<K, V> {
     void process(K key, V value);
 
     /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
-     * during {@link #init(ProcessorContext) initialization}.
-     *
-     * @deprecated Please use {@link Punctuator} functional interface instead.
-     * 
-     * @param timestamp the stream time when this method is being called
-     */
-    @Deprecated
-    void punctuate(long timestamp);
-
-    /**
      * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup.
      * Thus, it is not possible to write anything to Kafka as underlying clients are already closed.
      * <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 404b225..93a1455 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -132,18 +132,6 @@ public interface ProcessorContext {
                          final Punctuator callback);
 
     /**
-     * Schedules a periodic operation for processors. A processor may call this method during
-     * {@link Processor#init(ProcessorContext) initialization} to
-     * schedule a periodic call - called a punctuation - to {@link Processor#punctuate(long)}.
-     *
-     * @deprecated Please use {@link #schedule(long, PunctuationType, Punctuator)} instead.
-     *
-     * @param interval the time interval between punctuations
-     */
-    @Deprecated
-    void schedule(final long interval);
-
-    /**
      * Forwards a key/value pair to all downstream processors.
      * Used the input record's timestamp as timestamp for the output record.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 6bc4121..717e6a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -96,14 +96,4 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
     }
 
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @SuppressWarnings("deprecation")
-    @Override
-    public void schedule(long interval) {
-        throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 178937f..a539a1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -152,15 +152,4 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return task.schedule(interval, type, callback);
     }
 
-    @Override
-    @Deprecated
-    public void schedule(final long interval) {
-        schedule(interval, PunctuationType.STREAM_TIME, new Punctuator() {
-            @Override
-            public void punctuate(final long timestamp) {
-                currentNode().processor().punctuate(timestamp);
-            }
-        });
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 0854b67..a0a7041 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -37,9 +37,9 @@ public class ProcessorNode<K, V> {
     private final List<ProcessorNode<?, ?>> children;
     private final Map<String, ProcessorNode<?, ?>> childByName;
 
-    private final String name;
-    private final Processor<K, V> processor;
     private NodeMetrics nodeMetrics;
+    private final Processor<K, V> processor;
+    private final String name;
     private final Time time;
 
     private K key;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ef4585a..6aeca44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -188,15 +188,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    @Deprecated
-    public void schedule(final long interval) {
-        throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
     public RecordContext recordContext() {
         throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index d3e01fa..15e55d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -189,7 +189,7 @@ public class StreamsBuilderTest {
         driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
         // no exception was thrown
-        assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -208,8 +208,8 @@ public class StreamsBuilderTest {
         final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
         driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
-        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
-        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
+        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
     }
     
     @Test
@@ -232,7 +232,7 @@ public class StreamsBuilderTest {
         driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
         driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
 
-        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index eee3386..0c34723 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -296,10 +296,6 @@ public class TopologyTest {
                 @Override
                 public void process(Object key, Object value) { }
 
-                @SuppressWarnings("deprecation")
-                @Override
-                public void punctuate(long timestamp) { }
-
                 @Override
                 public void close() { }
             };
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index c4ea964..30c90c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -644,11 +644,6 @@ public class EosIntegrationTest {
                     }
 
                     @Override
-                    public KeyValue<Long, Long> punctuate(final long timestamp) {
-                        return null;
-                    }
-
-                    @Override
                     public void close() { }
                 };
             } }, storeNames)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 19ddedf..12b0d97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -57,7 +57,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -106,13 +105,12 @@ public class RestoreIntegrationTest {
     }
 
     @After
-    public void shutdown() throws IOException {
+    public void shutdown() {
         if (kafkaStreams != null) {
             kafkaStreams.close(30, TimeUnit.SECONDS);
         }
     }
 
-
     @Test
     public void shouldRestoreState() throws ExecutionException, InterruptedException {
         final AtomicInteger numReceived = new AtomicInteger(0);
@@ -276,11 +274,6 @@ public class RestoreIntegrationTest {
         }
 
         @Override
-        public void punctuate(final long timestamp) {
-
-        }
-
-        @Override
         public void close() {
 
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 81bdb31..b63f2de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -117,7 +117,7 @@ public class KStreamBuilderTest {
         driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
         // no exception was thrown
-        assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -134,8 +134,8 @@ public class KStreamBuilderTest {
         driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
         driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
 
-        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
-        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
+        assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -170,7 +170,7 @@ public class KStreamBuilderTest {
         driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
         driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
 
-        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 2aa07f3..1f9bcba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -76,12 +76,12 @@ public class AbstractStreamTest {
     public void testShouldBeExtensible() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
-        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         final String topicName = "topic";
 
         ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())));
 
-        stream.randomFilter().process(processor);
+        stream.randomFilter().process(supplier);
 
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test");
@@ -94,7 +94,7 @@ public class AbstractStreamTest {
             driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
-        assertTrue(processor.processed.size() <= expectedKeys.length);
+        assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length);
     }
 
     private class ExtendedKStream<K, V> extends AbstractStream<K> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index a70bc37..bd3d60b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -26,13 +26,14 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Array;
+import java.util.List;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -90,17 +91,15 @@ public class KStreamBranchTest {
 
         KStream<Integer, String> stream;
         KStream<Integer, String>[] branches;
-        MockProcessorSupplier<Integer, String>[] processors;
 
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         branches = stream.branch(isEven, isMultipleOfThree, isOdd);
 
         assertEquals(3, branches.length);
 
-        processors = (MockProcessorSupplier<Integer, String>[]) Array.newInstance(MockProcessorSupplier.class, branches.length);
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         for (int i = 0; i < branches.length; i++) {
-            processors[i] = new MockProcessorSupplier<>();
-            branches[i].process(processors[i]);
+            branches[i].process(supplier);
         }
 
         driver = new TopologyTestDriver(builder.build(), props);
@@ -108,9 +107,10 @@ public class KStreamBranchTest {
             driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
-        assertEquals(3, processors[0].processed.size());
-        assertEquals(1, processors[1].processed.size());
-        assertEquals(2, processors[2].processed.size());
+        final List<MockProcessor<Integer, String>> processors = supplier.capturedProcessors(3);
+        assertEquals(3, processors.get(0).processed.size());
+        assertEquals(1, processors.get(1).processed.size());
+        assertEquals(2, processors.get(2).processed.size());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index a67d688..d338fe3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -74,18 +74,18 @@ public class KStreamFilterTest {
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, String> processor;
+        MockProcessorSupplier<Integer, String> supplier;
 
-        processor = new MockProcessorSupplier<>();
+        supplier = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
-        stream.filter(isMultipleOfThree).process(processor);
+        stream.filter(isMultipleOfThree).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
-        assertEquals(2, processor.processed.size());
+        assertEquals(2, supplier.theCapturedProcessor().processed.size());
     }
 
     @Test
@@ -94,18 +94,18 @@ public class KStreamFilterTest {
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, String> processor;
+        MockProcessorSupplier<Integer, String> supplier;
 
-        processor = new MockProcessorSupplier<>();
+        supplier = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
-        stream.filterNot(isMultipleOfThree).process(processor);
+        stream.filterNot(isMultipleOfThree).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
-        assertEquals(5, processor.processed.size());
+        assertEquals(5, supplier.theCapturedProcessor().processed.size());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index e414218..9ce24b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -82,23 +82,23 @@ public class KStreamFlatMapTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         KStream<Integer, String> stream;
-        MockProcessorSupplier<String, String> processor;
+        MockProcessorSupplier<String, String> supplier;
 
-        processor = new MockProcessorSupplier<>();
+        supplier = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
-        stream.flatMap(mapper).process(processor);
+        stream.flatMap(mapper).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
-        assertEquals(6, processor.processed.size());
+        assertEquals(6, supplier.theCapturedProcessor().processed.size());
 
         String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"};
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+            assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index 14213c9..221b02b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -80,8 +80,8 @@ public class KStreamFlatMapValuesTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
-        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
-        stream.flatMapValues(mapper).process(processor);
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        stream.flatMapValues(mapper).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (final int expectedKey : expectedKeys) {
@@ -91,7 +91,7 @@ public class KStreamFlatMapValuesTest {
 
         String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"};
 
-        assertArrayEquals(expected, processor.processed.toArray());
+        assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
 
@@ -113,9 +113,9 @@ public class KStreamFlatMapValuesTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
-        final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
 
-        stream.flatMapValues(mapper).process(processor);
+        stream.flatMapValues(mapper).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (final int expectedKey : expectedKeys) {
@@ -125,6 +125,6 @@ public class KStreamFlatMapValuesTest {
 
         String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"};
 
-        assertArrayEquals(expected, processor.processed.toArray());
+        assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 2936f5f..6e5b816 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -36,7 +37,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.Set;
@@ -50,19 +50,19 @@ public class KStreamGlobalKTableJoinTest {
     private final Serde<Integer> intSerde = Serdes.Integer();
     private final Serde<String> stringSerde = Serdes.String();
     private TopologyTestDriver driver;
-    private MockProcessorSupplier<Integer, String> processor;
+    private MockProcessor<Integer, String> processor;
     private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
 
         builder = new StreamsBuilder();
         final KStream<Integer, String> stream;
         final GlobalKTable<String, String> table; // value of stream optionally contains key of table
         final KeyValueMapper<Integer, String, String> keyMapper;
 
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
         final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
         stream = builder.stream(streamTopic, streamConsumed);
@@ -76,7 +76,7 @@ public class KStreamGlobalKTableJoinTest {
                 return tokens.length > 1 ? tokens[1] : null;
             }
         };
-        stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+        stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test");
@@ -86,6 +86,8 @@ public class KStreamGlobalKTableJoinTest {
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         driver = new TopologyTestDriver(builder.build(), props);
+
+        processor = supplier.theCapturedProcessor();
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 8882113..b3551ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -29,13 +29,13 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.Set;
@@ -48,20 +48,22 @@ public class KStreamGlobalKTableLeftJoinTest {
     final private String globalTableTopic = "globalTableTopic";
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
+
+    private MockProcessor<Integer, String> processor;
     private TopologyTestDriver driver;
-    private MockProcessorSupplier<Integer, String> processor;
-    private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
     @Before
-    public void setUp() throws IOException {
+    public void setUp() {
 
         builder = new StreamsBuilder();
         final KStream<Integer, String> stream;
         final GlobalKTable<String, String> table; // value of stream optionally contains key of table
         final KeyValueMapper<Integer, String, String> keyMapper;
 
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         final Consumed<Integer, String> streamConsumed = Consumed.with(intSerde, stringSerde);
         final Consumed<String, String> tableConsumed = Consumed.with(stringSerde, stringSerde);
         stream = builder.stream(streamTopic, streamConsumed);
@@ -75,7 +77,7 @@ public class KStreamGlobalKTableLeftJoinTest {
                 return tokens.length > 1 ? tokens[1] : null;
             }
         };
-        stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor);
+        stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test");
@@ -85,6 +87,8 @@ public class KStreamGlobalKTableLeftJoinTest {
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         driver = new TopologyTestDriver(builder.build(), props);
+
+        processor = supplier.theCapturedProcessor();
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index f397246..797575d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -69,12 +69,15 @@ import static org.junit.Assert.fail;
 
 public class KStreamImplTest {
 
-    final private Serde<String> stringSerde = Serdes.String();
-    final private Serde<Integer> intSerde = Serdes.Integer();
+    private final Serde<String> stringSerde = Serdes.String();
+    private final Serde<Integer> intSerde = Serdes.Integer();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
+
+    private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
-    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
 
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private TopologyTestDriver driver;
@@ -222,12 +225,11 @@ public class KStreamImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
         final KStream<String, String> stream = builder.stream(input, consumed);
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         driver.pipeInput(recordFactory.create(input, "a", "b"));
-        assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b")));
+        assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b")));
     }
 
     @Test
@@ -235,13 +237,12 @@ public class KStreamImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
         final KStream<String, String> stream = builder.stream(input, consumed);
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         stream.to("to-topic", Produced.with(stringSerde, stringSerde));
         builder.stream("to-topic", consumed).process(processorSupplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         driver.pipeInput(recordFactory.create(input, "e", "f"));
-        assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f")));
+        assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f")));
     }
 
     @Test
@@ -519,7 +520,6 @@ public class KStreamImplTest {
         final KStream<String, String> source2 = builder.stream(topic2);
         final KStream<String, String> merged = source1.merge(source2);
 
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
@@ -529,7 +529,7 @@ public class KStreamImplTest {
         driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
         driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
 
-        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+        assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
     }
     
     @Test
@@ -545,7 +545,6 @@ public class KStreamImplTest {
         final KStream<String, String> source4 = builder.stream(topic4);
         final KStream<String, String> merged = source1.merge(source2).merge(source3).merge(source4);
 
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
@@ -560,14 +559,13 @@ public class KStreamImplTest {
         driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
-                     processorSupplier.processed);
+                     processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
     public void shouldProcessFromSourceThatMatchPattern() {
         final KStream<String, String> pattern2Source = builder.stream(Pattern.compile("topic-\\d"));
 
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         pattern2Source.process(processorSupplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
@@ -579,7 +577,7 @@ public class KStreamImplTest {
         driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
-                processorSupplier.processed);
+                processorSupplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -591,7 +589,6 @@ public class KStreamImplTest {
         final KStream<String, String> source3 = builder.stream(topic3);
         final KStream<String, String> merged = pattern2Source1.merge(pattern2Source2).merge(source3);
 
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         merged.process(processorSupplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
@@ -603,6 +600,6 @@ public class KStreamImplTest {
         driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
 
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
-                processorSupplier.processed);
+                processorSupplier.theCapturedProcessor().processed);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 63a040a..5d849ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -119,9 +120,7 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
         joined = stream1.join(
@@ -129,7 +128,7 @@ public class KStreamKStreamJoinTest {
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
             Joined.with(intSerde, stringSerde, stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -138,6 +137,8 @@ public class KStreamKStreamJoinTest {
 
         driver = new TopologyTestDriver(builder.build(), props);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         // push two items to the primary stream. the other window is empty
         // w1 = {}
         // w2 = {}
@@ -220,9 +221,7 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -231,7 +230,7 @@ public class KStreamKStreamJoinTest {
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
             Joined.with(intSerde, stringSerde, stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
@@ -239,6 +238,8 @@ public class KStreamKStreamJoinTest {
 
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         // push two items to the primary stream. the other window is empty.this should produce two items
         // w1 = {}
         // w2 = {}
@@ -323,9 +324,7 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -334,7 +333,7 @@ public class KStreamKStreamJoinTest {
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(100),
             Joined.with(intSerde, stringSerde, stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -352,6 +351,8 @@ public class KStreamKStreamJoinTest {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time));
         }
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         processor.checkAndClearProcessResult();
 
         // push two items to the other stream. this should produce two items.
@@ -543,9 +544,7 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -556,7 +555,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(intSerde,
                 stringSerde,
                 stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -565,6 +564,8 @@ public class KStreamKStreamJoinTest {
 
         driver = new TopologyTestDriver(builder.build(), props, time);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
         }
@@ -653,9 +654,8 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
 
-        processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -664,7 +664,7 @@ public class KStreamKStreamJoinTest {
             MockValueJoiner.TOSTRING_JOINER,
             JoinWindows.of(0).before(100),
             Joined.with(intSerde, stringSerde, stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -673,6 +673,8 @@ public class KStreamKStreamJoinTest {
 
         driver = new TopologyTestDriver(builder.build(), props, time);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i));
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index cb1aaf1..c67e13d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -83,9 +84,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -93,7 +92,7 @@ public class KStreamKStreamLeftJoinTest {
                                   MockValueJoiner.TOSTRING_JOINER,
                                   JoinWindows.of(100),
                                   Joined.with(intSerde, stringSerde, stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -102,6 +101,8 @@ public class KStreamKStreamLeftJoinTest {
 
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         // push two items to the primary stream. the other window is empty
         // w1 {}
         // w2 {}
@@ -168,9 +169,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -178,7 +177,7 @@ public class KStreamKStreamLeftJoinTest {
                                   MockValueJoiner.TOSTRING_JOINER,
                                   JoinWindows.of(100),
                                   Joined.with(intSerde, stringSerde, stringSerde));
-        joined.process(processor);
+        joined.process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -187,6 +186,8 @@ public class KStreamKStreamLeftJoinTest {
 
         driver = new TopologyTestDriver(builder.build(), props, time);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         // push two items to the primary stream. the other window is empty. this should produce two items
         // w1 = {}
         // w2 = {}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 5b2a797..ec31b5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -54,9 +55,11 @@ public class KStreamKTableJoinTest {
     private final Serde<Integer> intSerde = Serdes.Integer();
     private final Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
-    private TopologyTestDriver driver;
-    private MockProcessorSupplier<Integer, String> processor;
+
     private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockProcessor<Integer, String> processor;
+    private TopologyTestDriver driver;
     private StreamsBuilder builder;
 
     @Before
@@ -66,11 +69,11 @@ public class KStreamKTableJoinTest {
         final KStream<Integer, String> stream;
         final KTable<Integer, String> table;
 
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
-        stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
+        stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test");
@@ -80,6 +83,8 @@ public class KStreamKTableJoinTest {
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+        processor = supplier.theCapturedProcessor();
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 669f4c7..735f71c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -51,7 +52,8 @@ public class KStreamKTableLeftJoinTest {
     final private Serde<String> stringSerde = Serdes.String();
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
     private TopologyTestDriver driver;
-    private MockProcessorSupplier<Integer, String> processor;
+    private MockProcessor<Integer, String> processor;
+
     private final int[] expectedKeys = {0, 1, 2, 3};
     private StreamsBuilder builder;
 
@@ -63,11 +65,11 @@ public class KStreamKTableLeftJoinTest {
         final KStream<Integer, String> stream;
         final KTable<Integer, String> table;
 
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
-        stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
+        stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier);
 
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test");
@@ -77,6 +79,8 @@ public class KStreamKTableLeftJoinTest {
         props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         driver = new TopologyTestDriver(builder.build(), props, 0L);
+
+        processor = supplier.theCapturedProcessor();
     }
 
     private void pushToStream(final int messageCount, final String valuePrefix) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index bb22204..b0a383b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -81,22 +81,22 @@ public class KStreamMapTest {
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
-        MockProcessorSupplier<String, Integer> processor;
+        MockProcessorSupplier<String, Integer> supplier;
 
-        processor = new MockProcessorSupplier<>();
-        stream.map(mapper).process(processor);
+        supplier = new MockProcessorSupplier<>();
+        stream.map(mapper).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
             driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
         }
 
-        assertEquals(4, processor.processed.size());
+        assertEquals(4, supplier.theCapturedProcessor().processed.size());
 
         String[] expected = new String[]{"V0:0", "V1:1", "V2:2", "V3:3"};
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+            assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 17a13e0..ed11038 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -44,6 +44,9 @@ public class KStreamMapValuesTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
+    final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
+
+
     private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
     private TopologyTestDriver driver;
     private final Properties props = new Properties();
@@ -81,9 +84,8 @@ public class KStreamMapValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
-        stream.mapValues(mapper).process(processor);
+        stream.mapValues(mapper).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
@@ -91,7 +93,7 @@ public class KStreamMapValuesTest {
         }
         String[] expected = {"1:1", "10:2", "100:3", "1000:4"};
 
-        assertArrayEquals(expected, processor.processed.toArray());
+        assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
     @Test
@@ -109,9 +111,8 @@ public class KStreamMapValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, String> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
-        stream.mapValues(mapper).process(processor);
+        stream.mapValues(mapper).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
         for (int expectedKey : expectedKeys) {
@@ -119,7 +120,7 @@ public class KStreamMapValuesTest {
         }
         String[] expected = {"1:2", "10:12", "100:103", "1000:1004"};
 
-        assertArrayEquals(expected, processor.processed.toArray());
+        assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 0bf6452..1abc0b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -90,9 +90,9 @@ public class KStreamSelectKeyTest {
 
         KStream<String, Integer>  stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
 
-        MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        stream.selectKey(selector).process(processor);
+        stream.selectKey(selector).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
 
@@ -100,10 +100,10 @@ public class KStreamSelectKeyTest {
             driver.pipeInput(recordFactory.create(expectedValue));
         }
 
-        assertEquals(3, processor.processed.size());
+        assertEquals(3, supplier.theCapturedProcessor().processed.size());
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+            assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
         }
 
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index aa0cf7e..1567fe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -48,6 +48,7 @@ public class KStreamTransformTest {
     private String topicName = "topic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
+
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
     private TopologyTestDriver driver;
     private final Properties props = new Properties();
@@ -77,34 +78,26 @@ public class KStreamTransformTest {
     public void testTransform() {
         StreamsBuilder builder = new StreamsBuilder();
 
-        TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
-            new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
-                public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
-                    return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
-
-                        private int total = 0;
+        final TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier = new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+            public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+                return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
 
-                        @Override
-                        public void init(ProcessorContext context) {
-                        }
+                    private int total = 0;
 
-                        @Override
-                        public KeyValue<Integer, Integer> transform(Number key, Number value) {
-                            total += value.intValue();
-                            return KeyValue.pair(key.intValue() * 2, total);
-                        }
+                    @Override
+                    public void init(final ProcessorContext context) {}
 
-                        @Override
-                        public KeyValue<Integer, Integer> punctuate(long timestamp) {
-                            return KeyValue.pair(-1, (int) timestamp);
-                        }
+                    @Override
+                    public KeyValue<Integer, Integer> transform(final Number key, final Number value) {
+                        total += value.intValue();
+                        return KeyValue.pair(key.intValue() * 2, total);
+                    }
 
-                        @Override
-                        public void close() {
-                        }
-                    };
-                }
-            };
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
@@ -117,15 +110,18 @@ public class KStreamTransformTest {
             kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
         }
 
-        kstreamDriver.punctuate(2);
-        kstreamDriver.punctuate(3);
+        // TODO: un-comment after replaced with TopologyTestDriver
+        //kstreamDriver.punctuate(2);
+        //kstreamDriver.punctuate(3);
 
-        assertEquals(6, processor.processed.size());
+        //assertEquals(6, processor.theCapturedProcessor().processed.size());
 
-        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+        //String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
+
+        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+            assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
         }
     }
 
@@ -133,40 +129,34 @@ public class KStreamTransformTest {
     public void testTransformWithNewDriverAndPunctuator() {
         StreamsBuilder builder = new StreamsBuilder();
 
-        TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
-            new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
-                public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
-                    return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
-
-                        private int total = 0;
-
-                        @Override
-                        public void init(final ProcessorContext context) {
-                            context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
-                                @Override
-                                public void punctuate(long timestamp) {
-                                    context.forward(-1, (int) timestamp);
-                                }
-                            });
-                        }
-
-                        @Override
-                        public KeyValue<Integer, Integer> transform(Number key, Number value) {
-                            total += value.intValue();
-                            return KeyValue.pair(key.intValue() * 2, total);
-                        }
-
-                        @Override
-                        public KeyValue<Integer, Integer> punctuate(long timestamp) {
-                            return null;
-                        }
-
-                        @Override
-                        public void close() {
-                        }
-                    };
-                }
-            };
+        TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier = new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+            public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+                return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
+
+                    private int total = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+                            @Override
+                            public void punctuate(long timestamp) {
+                                context.forward(-1, (int) timestamp);
+                            }
+                        });
+                    }
+
+                    @Override
+                    public KeyValue<Integer, Integer> transform(final Number key, final Number value) {
+                        total += value.intValue();
+                        return KeyValue.pair(key.intValue() * 2, total);
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
@@ -184,12 +174,12 @@ public class KStreamTransformTest {
         // This tick further advances the clock to 3, which leads to the "-1:3" result
         driver.advanceWallClockTime(1);
 
-        assertEquals(6, processor.processed.size());
+        assertEquals(6, processor.theCapturedProcessor().processed.size());
 
         String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+            assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 59a6a21..6bfc813 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -49,6 +49,8 @@ public class KStreamTransformValuesTest {
     private String topicName = "topic";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
+    final private MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
+
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
     private TopologyTestDriver driver;
     private final Properties props = new Properties();
@@ -107,9 +109,8 @@ public class KStreamTransformValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, Integer> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
-        stream.transformValues(valueTransformerSupplier).process(processor);
+        stream.transformValues(valueTransformerSupplier).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
 
@@ -118,7 +119,7 @@ public class KStreamTransformValuesTest {
         }
         String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
 
-        assertArrayEquals(expected, processor.processed.toArray());
+        assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
     @Test
@@ -151,9 +152,8 @@ public class KStreamTransformValuesTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         KStream<Integer, Integer> stream;
-        MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
-        stream.transformValues(valueTransformerSupplier).process(processor);
+        stream.transformValues(valueTransformerSupplier).process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props);
 
@@ -162,7 +162,7 @@ public class KStreamTransformValuesTest {
         }
         String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"};
 
-        assertArrayEquals(expected, processor.processed.toArray());
+        assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
 
@@ -226,13 +226,6 @@ public class KStreamTransformValuesTest {
         } catch (final StreamsException e) {
             // expected
         }
-
-        try {
-            transformValueProcessor.punctuate(0);
-            fail("should not allow ValueTransformer#puntuate() to return not-null value");
-        } catch (final StreamsException e) {
-            // expected
-        }
     }
 
     private static final class BadValueTransformer implements ValueTransformerWithKey<Integer, Integer, Integer> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index fc31db9..9050edb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -37,12 +37,14 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -85,8 +87,8 @@ public class KStreamWindowAggregateTest {
             .groupByKey(Serialized.with(strSerde, strSerde))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
 
-        final MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
@@ -128,7 +130,7 @@ public class KStreamWindowAggregateTest {
                 "[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2",
                 "[C@5/15]:0+3+3", "[C@10/20]:0+3"
             ),
-            proc2.processed
+            supplier.theCapturedProcessor().processed
         );
     }
 
@@ -143,24 +145,22 @@ public class KStreamWindowAggregateTest {
             .groupByKey(Serialized.with(strSerde, strSerde))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized");
 
-        final MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
-        table1.toStream().process(proc1);
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic2, Consumed.with(strSerde, strSerde)).groupByKey(Serialized.with(strSerde, strSerde))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic2-Canonized");
 
-        final MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        table2.toStream().process(supplier);
 
 
-        final MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
         table1.join(table2, new ValueJoiner<String, String, String>() {
             @Override
             public String apply(final String p1, final String p2) {
                 return p1 + "%" + p2;
             }
-        }).toStream().process(proc3);
+        }).toStream().process(supplier);
 
         driver = new TopologyTestDriver(builder.build(), props, 0L);
 
@@ -170,15 +170,17 @@ public class KStreamWindowAggregateTest {
         driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
         driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
 
-        proc1.checkAndClearProcessResult(
+        final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+
+        processors.get(0).checkAndClearProcessResult(
             "[A@0/10]:0+1",
             "[B@0/10]:0+2",
             "[C@0/10]:0+3",
             "[D@0/10]:0+4",
             "[A@0/10]:0+1+1"
         );
-        proc2.checkAndClearProcessResult();
-        proc3.checkAndClearProcessResult();
+        processors.get(1).checkAndClearProcessResult();
+        processors.get(2).checkAndClearProcessResult();
 
         driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
         driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
@@ -186,15 +188,15 @@ public class KStreamWindowAggregateTest {
         driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
         driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
 
-        proc1.checkAndClearProcessResult(
+        processors.get(0).checkAndClearProcessResult(
             "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
             "[B@0/10]:0+2+2", "[B@5/15]:0+2",
             "[D@0/10]:0+4+4", "[D@5/15]:0+4",
             "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
             "[C@0/10]:0+3+3", "[C@5/15]:0+3"
         );
-        proc2.checkAndClearProcessResult();
-        proc3.checkAndClearProcessResult();
+        processors.get(1).checkAndClearProcessResult();
+        processors.get(2).checkAndClearProcessResult();
 
         driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
         driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
@@ -202,15 +204,15 @@ public class KStreamWindowAggregateTest {
         driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
         driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
 
-        proc1.checkAndClearProcessResult();
-        proc2.checkAndClearProcessResult(
+        processors.get(0).checkAndClearProcessResult();
+        processors.get(1).checkAndClearProcessResult(
             "[A@0/10]:0+a",
             "[B@0/10]:0+b",
             "[C@0/10]:0+c",
             "[D@0/10]:0+d",
             "[A@0/10]:0+a+a"
         );
-        proc3.checkAndClearProcessResult(
+        processors.get(2).checkAndClearProcessResult(
             "[A@0/10]:0+1+1+1%0+a",
             "[B@0/10]:0+2+2+2%0+b",
             "[C@0/10]:0+3+3%0+c",
@@ -223,15 +225,15 @@ public class KStreamWindowAggregateTest {
         driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
         driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
 
-        proc1.checkAndClearProcessResult();
-        proc2.checkAndClearProcessResult(
+        processors.get(0).checkAndClearProcessResult();
+        processors.get(1).checkAndClearProcessResult(
             "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
             "[B@0/10]:0+b+b", "[B@5/15]:0+b",
             "[D@0/10]:0+d+d", "[D@5/15]:0+d",
             "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
             "[C@0/10]:0+c+c", "[C@5/15]:0+c"
         );
-        proc3.checkAndClearProcessResult(
+        processors.get(2).checkAndClearProcessResult(
             "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
             "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
             "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index df8d292..a769b49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -50,9 +51,10 @@ import static org.junit.Assert.assertEquals;
 
 public class KTableAggregateTest {
 
-    final private Serde<String> stringSerde = Serdes.String();
+    private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde);
+    private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
 
     private File stateDir = null;
 
@@ -70,7 +72,7 @@ public class KTableAggregateTest {
     public void testAggBasic() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
-        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
@@ -81,7 +83,7 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        table2.toStream().process(proc);
+        table2.toStream().process(supplier);
 
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
 
@@ -110,7 +112,7 @@ public class KTableAggregateTest {
                 "C:0+5",
                 "D:0+6",
                 "B:0+2-2+4-4+7",
-                "C:0+5-5+8"), proc.processed);
+                "C:0+5-5+8"), supplier.theCapturedProcessor().processed);
     }
 
 
@@ -118,7 +120,6 @@ public class KTableAggregateTest {
     public void testAggCoalesced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
-        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(),
@@ -129,7 +130,7 @@ public class KTableAggregateTest {
             stringSerde,
             "topic1-Canonized");
 
-        table2.toStream().process(proc);
+        table2.toStream().process(supplier);
 
         driver.setUp(builder, stateDir);
 
@@ -138,7 +139,7 @@ public class KTableAggregateTest {
         driver.process(topic1, "A", "4");
         driver.flushState();
         assertEquals(Utils.mkList(
-            "A:0+4"), proc.processed);
+            "A:0+4"), supplier.theCapturedProcessor().processed);
     }
 
 
@@ -146,7 +147,6 @@ public class KTableAggregateTest {
     public void testAggRepartition() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
-        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@@ -170,7 +170,7 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        table2.toStream().process(proc);
+        table2.toStream().process(supplier);
 
         driver.setUp(builder, stateDir);
 
@@ -200,10 +200,10 @@ public class KTableAggregateTest {
                 "2:0+2-2", "4:0+4",
                   //noop
                 "4:0+4-4", "7:0+7"
-                ), proc.processed);
+                ), supplier.theCapturedProcessor().processed);
     }
 
-    private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Long> proc) {
+    private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier<String, Object> supplier) {
         driver.setUp(builder, stateDir);
 
         driver.process(input, "A", "green");
@@ -225,53 +225,53 @@ public class KTableAggregateTest {
             "green:1", "blue:1",
             "yellow:1",
             "green:2"
-        ), proc.processed);
+        ), supplier.theCapturedProcessor().processed);
     }
 
     @Test
     public void testCount() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
-        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
                 .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
                 .count("count")
                 .toStream()
-                .process(proc);
+                .process(supplier);
 
-        testCountHelper(builder, input, proc);
+        testCountHelper(builder, input, supplier);
     }
 
     @Test
     public void testCountWithInternalStore() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
-        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
             .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
             .count()
             .toStream()
-            .process(proc);
+            .process(supplier);
 
-        testCountHelper(builder, input, proc);
+        testCountHelper(builder, input, supplier);
     }
 
     @Test
     public void testCountCoalesced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
-        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Long> supplier = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
             .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied)
             .count("count")
             .toStream()
-            .process(proc);
+            .process(supplier);
 
         driver.setUp(builder, stateDir);
 
+        final MockProcessor<String, Long> proc = supplier.theCapturedProcessor();
+
         driver.process(input, "A", "green");
         driver.process(input, "B", "green");
         driver.process(input, "A", "blue");
@@ -291,7 +291,7 @@ public class KTableAggregateTest {
     public void testRemoveOldBeforeAddNew() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
-        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
 
         builder.table(input, consumed)
                 .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@@ -321,10 +321,12 @@ public class KTableAggregateTest {
                     }
                 }, Serdes.String(), "someStore")
                 .toStream()
-                .process(proc);
+                .process(supplier);
 
         driver.setUp(builder, stateDir);
 
+        final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
+
         driver.process(input, "11", "A");
         driver.flushState();
         driver.process(input, "12", "B");
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 657e05d..bde771b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -21,11 +21,13 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockMapper;
@@ -35,6 +37,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -57,10 +60,9 @@ public class KTableFilterTest {
                               final KTable<String, Integer> table2,
                               final KTable<String, Integer> table3,
                               final String topic) {
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
-        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
-        table3.toStream().process(proc3);
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+        table3.toStream().process(supplier);
 
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
@@ -73,8 +75,10 @@ public class KTableFilterTest {
         driver.process(topic, "B", null);
         driver.flushState();
 
-        proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
-        proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+        final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+        processors.get(0).checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+        processors.get(1).checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
     }
 
     @Test
@@ -269,11 +273,10 @@ public class KTableFilterTest {
                                           final KTableImpl<String, Integer, Integer> table1,
                                           final KTableImpl<String, Integer, Integer> table2,
                                           final String topic1) {
-        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", proc1, table1.name);
-        builder.build().addProcessor("proc2", proc2, table2.name);
+        builder.build().addProcessor("proc1", supplier, table1.name);
+        builder.build().addProcessor("proc2", supplier, table2.name);
 
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
@@ -282,25 +285,27 @@ public class KTableFilterTest {
         driver.process(topic1, "C", 1);
         driver.flushState();
 
-        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+        final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+        processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
         driver.process(topic1, "A", 3);
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(3<-null)");
-        proc2.checkAndClearProcessResult("A:(null<-null)");
+        processors.get(0).checkAndClearProcessResult("A:(3<-null)");
+        processors.get(1).checkAndClearProcessResult("A:(null<-null)");
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
-        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+        processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
 
 
@@ -348,11 +353,11 @@ public class KTableFilterTest {
                                        final String topic1) {
         table2.enableSendingOldValues();
 
-        MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final Topology topology = builder.build();
 
-        builder.build().addProcessor("proc1", proc1, table1.name);
-        builder.build().addProcessor("proc2", proc2, table2.name);
+        topology.addProcessor("proc1", supplier, table1.name);
+        topology.addProcessor("proc2", supplier, table2.name);
 
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
 
@@ -361,25 +366,27 @@ public class KTableFilterTest {
         driver.process(topic1, "C", 1);
         driver.flushState();
 
-        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-        proc2.checkEmptyAndClearProcessResult();
+        final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+        processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+        processors.get(1).checkEmptyAndClearProcessResult();
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
-        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+        processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+        processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
         driver.process(topic1, "A", 3);
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(3<-2)");
-        proc2.checkAndClearProcessResult("A:(null<-2)");
+        processors.get(0).checkAndClearProcessResult("A:(3<-2)");
+        processors.get(1).checkAndClearProcessResult("A:(null<-2)");
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
-        proc2.checkAndClearProcessResult("B:(null<-2)");
+        processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+        processors.get(1).checkAndClearProcessResult("B:(null<-2)");
     }
 
     @Test
@@ -424,11 +431,11 @@ public class KTableFilterTest {
                                                  final KTableImpl<String, String, String> table1,
                                                  final KTableImpl<String, String, String> table2,
                                                  final String topic1) {
-        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+        final Topology topology = builder.build();
 
-        builder.build().addProcessor("proc1", proc1, table1.name);
-        builder.build().addProcessor("proc2", proc2, table2.name);
+        topology.addProcessor("proc1", supplier, table1.name);
+        topology.addProcessor("proc2", supplier, table2.name);
 
         driver.setUp(builder, stateDir, stringSerde, stringSerde);
 
@@ -436,8 +443,10 @@ public class KTableFilterTest {
         driver.process(topic1, "B", "reject");
         driver.process(topic1, "C", "reject");
         driver.flushState();
-        proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
-        proc2.checkEmptyAndClearProcessResult();
+
+        final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2);
+        processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
+        processors.get(1).checkEmptyAndClearProcessResult();
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index a7aed2e..ae1e285 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
@@ -47,6 +48,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.lang.reflect.Field;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -80,8 +82,8 @@ public class KTableImplTest {
 
         KTable<String, String> table1 = builder.table(topic1, consumed);
 
-        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
-        table1.toStream().process(proc1);
+        MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
 
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
             @Override
@@ -90,8 +92,7 @@ public class KTableImplTest {
             }
         });
 
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        table2.toStream().process(supplier);
 
         KTable<String, Integer> table3 = table2.filter(new Predicate<String, Integer>() {
             @Override
@@ -100,13 +101,11 @@ public class KTableImplTest {
             }
         });
 
-        MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
-        table3.toStream().process(proc3);
+        table3.toStream().process(supplier);
 
         KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2, storeName2);
 
-        MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
-        table4.toStream().process(proc4);
+        table4.toStream().process(supplier);
 
         driver.setUp(builder, stateDir);
 
@@ -120,10 +119,11 @@ public class KTableImplTest {
         driver.flushState();
         driver.flushState();
 
-        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed);
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
-        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed);
-        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed);
+        final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(0).processed);
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), processors.get(1).processed);
+        assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), processors.get(2).processed);
+        assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(3).processed);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 9f5603b..0ca388f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -67,7 +68,7 @@ public class KTableKTableInnerJoinTest {
 
     private void doTestJoin(final StreamsBuilder builder,
                             final int[] expectedKeys,
-                            final MockProcessorSupplier<Integer, String> processor,
+                            final MockProcessorSupplier<Integer, String> supplier,
                             final KTable<Integer, String> joined) {
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -79,6 +80,8 @@ public class KTableKTableInnerJoinTest {
         driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         final KTableValueGetter<Integer, String> getter = getterSupplier.get();
         getter.init(driver.context());
 
@@ -168,15 +171,13 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-        joined.toStream().process(processor);
+        joined.toStream().process(supplier);
 
-        doTestJoin(builder, expectedKeys, processor, joined);
+        doTestJoin(builder, expectedKeys, supplier, joined);
     }
 
     @Test
@@ -203,13 +204,15 @@ public class KTableKTableInnerJoinTest {
                                         final int[] expectedKeys,
                                         final KTable<Integer, String> table1,
                                         final KTable<Integer, String> table2,
-                                        final MockProcessorSupplier<Integer, String> proc,
+                                        final MockProcessorSupplier<Integer, String> supplier,
                                         final KTable<Integer, String> joined,
                                         final boolean sendOldValues) {
 
         driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
+        final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
         if (!sendOldValues) {
             assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
             assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
@@ -288,15 +291,15 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
+        doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false);
 
     }
 
@@ -309,15 +312,15 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
+        doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false);
 
     }
 
@@ -330,16 +333,16 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);
+        doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, true);
 
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 6331b57..2eef302 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
@@ -80,9 +81,8 @@ public class KTableKTableLeftJoinTest {
         final KTable<Integer, String> table1 = builder.table(topic1, consumed);
         final KTable<Integer, String> table2 = builder.table(topic2, consumed);
         final KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
-        final MockProcessorSupplier<Integer, String> processor;
-        processor = new MockProcessorSupplier<>();
-        joined.toStream().process(processor);
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        joined.toStream().process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -94,6 +94,8 @@ public class KTableKTableLeftJoinTest {
         driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         final KTableValueGetter<Integer, String> getter = getterSupplier.get();
         getter.init(driver.context());
 
@@ -174,18 +176,20 @@ public class KTableKTableLeftJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
+        final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
@@ -255,7 +259,7 @@ public class KTableKTableLeftJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
@@ -263,12 +267,14 @@ public class KTableKTableLeftJoinTest {
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver.setUp(builder, stateDir);
         driver.setTime(0L);
 
+        final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 16694d8..cf3321f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -74,13 +75,13 @@ public class KTableKTableOuterJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> processor;
+        final MockProcessorSupplier<Integer, String> supplier;
 
-        processor = new MockProcessorSupplier<>();
+        supplier = new MockProcessorSupplier<>();
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
-        joined.toStream().process(processor);
+        joined.toStream().process(supplier);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
@@ -91,6 +92,8 @@ public class KTableKTableOuterJoinTest {
 
         driver.setUp(builder, stateDir);
 
+        final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
         final KTableValueGetter<Integer, String> getter = getterSupplier.get();
         getter.init(driver.context());
 
@@ -179,17 +182,19 @@ public class KTableKTableOuterJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver.setUp(builder, stateDir);
 
+        final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
@@ -267,7 +272,7 @@ public class KTableKTableOuterJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> proc;
+        final MockProcessorSupplier<Integer, String> supplier;
 
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
@@ -275,11 +280,13 @@ public class KTableKTableOuterJoinTest {
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
-        proc = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+        supplier = new MockProcessorSupplier<>();
+        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
         driver.setUp(builder, stateDir);
 
+        final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 81797cb..78c7902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -78,9 +78,9 @@ public class KTableMapKeysTest {
         final int[] originalKeys = new int[]{1, 2, 3};
         final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
 
-        MockProcessorSupplier<String, String> processor = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
 
-        convertedStream.process(processor);
+        convertedStream.process(supplier);
 
         driver.setUp(builder, stateDir);
         for (int i = 0;  i < originalKeys.length; i++) {
@@ -88,10 +88,10 @@ public class KTableMapKeysTest {
         }
         driver.flushState();
 
-        assertEquals(3, processor.processed.size());
+        assertEquals(3, supplier.theCapturedProcessor().processed.size());
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], processor.processed.get(i));
+            assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
         }
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 5d92846..3cd7701 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -54,7 +55,7 @@ public class KTableMapValuesTest {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
-    private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
+    private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) {
         driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
 
         driver.process(topic1, "A", "1");
@@ -62,7 +63,7 @@ public class KTableMapValuesTest {
         driver.process(topic1, "C", "3");
         driver.process(topic1, "D", "4");
         driver.flushState();
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -79,10 +80,10 @@ public class KTableMapValuesTest {
             }
         });
 
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
 
-        doTestKTable(builder, topic1, proc2);
+        doTestKTable(builder, topic1, supplier);
     }
 
     @Test
@@ -99,10 +100,10 @@ public class KTableMapValuesTest {
             }
         }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
 
-        MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
 
-        doTestKTable(builder, topic1, proc2);
+        doTestKTable(builder, topic1, supplier);
     }
 
     private void doTestValueGetter(final StreamsBuilder builder,
@@ -282,11 +283,14 @@ public class KTableMapValuesTest {
                     }
                 });
 
-        MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc", proc, table2.name);
+        builder.build().addProcessor("proc", supplier, table2.name);
 
         driver.setUp(builder, stateDir);
+
+        final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+
         assertFalse(table1.sendingOldValueEnabled());
         assertFalse(table2.sendingOldValueEnabled());
 
@@ -332,11 +336,14 @@ public class KTableMapValuesTest {
 
         table2.enableSendingOldValues();
 
-        MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc", proc, table2.name);
+        builder.build().addProcessor("proc", supplier, table2.name);
 
         driver.setUp(builder, stateDir);
+
+        final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+
         assertTrue(table1.sendingOldValueEnabled());
         assertTrue(table2.sendingOldValueEnabled());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 97c9c7f..70efb41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -61,8 +62,8 @@ public class KTableSourceTest {
 
         final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
 
-        final MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
-        table1.toStream().process(proc1);
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        table1.toStream().process(supplier);
 
         driver.setUp(builder, stateDir);
         driver.process(topic1, "A", 1);
@@ -74,7 +75,7 @@ public class KTableSourceTest {
         driver.process(topic1, "B", null);
         driver.flushState();
 
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
+        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), supplier.theCapturedProcessor().processed);
     }
 
     @Test
@@ -145,11 +146,14 @@ public class KTableSourceTest {
 
         final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
-        final MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc1", supplier, table1.name);
 
         driver.setUp(builder, stateDir);
+
+        final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
@@ -187,12 +191,14 @@ public class KTableSourceTest {
 
         assertTrue(table1.sendingOldValueEnabled());
 
-        final MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", proc1, table1.name);
+        builder.build().addProcessor("proc1", supplier, table1.name);
 
         driver.setUp(builder, stateDir);
 
+        final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index e3b888d..d1d25e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -638,10 +638,6 @@ public class TopologyBuilderTest {
                 }
 
                 @Override
-                public void punctuate(long timestamp) {
-                }
-
-                @Override
                 public void close() {
                 }
             };
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index aac275d..43dc38e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -177,9 +177,6 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
-        public void schedule(final long interval) {}
-
-        @Override
         public <K, V> void forward(final K key, final V value) {}
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 5637dab..f3e369f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -64,8 +64,8 @@ public class GlobalStateTaskTest {
         new String[]{topic2},
         new IntegerDeserializer(),
         new IntegerDeserializer());
-    private final MockProcessorNode processorOne = new MockProcessorNode<>(-1);
-    private final MockProcessorNode processorTwo = new MockProcessorNode<>(-1);
+    private final MockProcessorNode processorOne = new MockProcessorNode<>();
+    private final MockProcessorNode processorTwo = new MockProcessorNode<>();
 
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
     private final NoOpProcessorContext context = new NoOpProcessorContext();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b3663fa..149a158 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -607,9 +607,6 @@ public class InternalTopologyBuilderTest {
                 public void process(final Object key, final Object value) { }
 
                 @Override
-                public void punctuate(final long timestamp) { }
-
-                @Override
                 public void close() {
                 }
             };
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a7a2610..0992063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -62,11 +62,6 @@ public class ProcessorNodeTest {
         }
 
         @Override
-        public void punctuate(final long timestamp) {
-            throw new RuntimeException();
-        }
-
-        @Override
         public void close() {
             throw new RuntimeException();
         }
@@ -84,11 +79,6 @@ public class ProcessorNodeTest {
         }
 
         @Override
-        public void punctuate(final long timestamp) {
-
-        }
-
-        @Override
         public void close() {
 
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a80b25d..51d4e05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -465,11 +464,6 @@ public class ProcessorTopologyTest {
         public void process(final String key, final String value) {
             context().forward(key, value);
         }
-
-        @Override
-        public void punctuate(final long streamTime) {
-            context().forward(Long.toString(streamTime), "punctuate");
-        }
     }
 
     /**
@@ -510,14 +504,6 @@ public class ProcessorTopologyTest {
                 context().forward(key, value + "(" + (i + 1) + ")", i);
             }
         }
-
-        @SuppressWarnings("deprecation")
-        @Override
-        public void punctuate(final long streamTime) {
-            for (int i = 0; i != numChildren; ++i) {
-                context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
-            }
-        }
     }
 
     /**
@@ -538,19 +524,10 @@ public class ProcessorTopologyTest {
                 context().forward(key, value + "(" + (i + 1) + ")",  "sink" + i);
             }
         }
-
-        @SuppressWarnings("deprecation")
-        @Override
-        public void punctuate(final long streamTime) {
-            for (int i = 0; i != numChildren; ++i) {
-                context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
-            }
-        }
     }
 
     /**
-     * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
-     * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
+     * A processor that stores each key-value pair in an in-memory key-value store registered with the context.
      */
     protected static class StatefulProcessor extends AbstractProcessor<String, String> {
         private KeyValueStore<String, String> store;
@@ -573,18 +550,6 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void punctuate(final long streamTime) {
-            int count = 0;
-            try (KeyValueIterator<String, String> iter = store.all()) {
-                while (iter.hasNext()) {
-                    iter.next();
-                    ++count;
-                }
-            }
-            context().forward(Long.toString(streamTime), count);
-        }
-
-        @Override
         public void close() {
             store.close();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index e799688..ee0d5a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -21,26 +21,24 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.test.MockProcessorNode;
 import org.junit.Test;
 
-import java.util.ArrayList;
-
 import static org.junit.Assert.assertEquals;
 
 public class PunctuationQueueTest {
 
+    private final MockProcessorNode<String, String> node = new MockProcessorNode<>();
+    private final PunctuationQueue queue = new PunctuationQueue();
+    private final Punctuator punctuator = new Punctuator() {
+        @Override
+        public void punctuate(final long timestamp) {
+            node.mockProcessor.punctuatedStreamTime.add(timestamp);
+        }
+    };
+
     @Test
     public void testPunctuationInterval() {
-        final TestProcessor processor = new TestProcessor();
-        final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
-        final PunctuationQueue queue = new PunctuationQueue();
-        final Punctuator punctuator = new Punctuator() {
-            @Override
-            public void punctuate(long timestamp) {
-                node.processor().punctuate(timestamp);
-            }
-        };
-
         final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
         final long now = sched.timestamp - 100L;
 
@@ -54,42 +52,32 @@ public class PunctuationQueueTest {
         };
 
         queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(0, processor.punctuatedAt.size());
+        assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(0, processor.punctuatedAt.size());
+        assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(1, processor.punctuatedAt.size());
+        assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(1, processor.punctuatedAt.size());
+        assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(2, processor.punctuatedAt.size());
+        assertEquals(2, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(3, processor.punctuatedAt.size());
+        assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(3, processor.punctuatedAt.size());
+        assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(4, processor.punctuatedAt.size());
+        assertEquals(4, node.mockProcessor.punctuatedStreamTime.size());
     }
 
     @Test
     public void testPunctuationIntervalCustomAlignment() {
-        final TestProcessor processor = new TestProcessor();
-        final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
-        final PunctuationQueue queue = new PunctuationQueue();
-        final Punctuator punctuator = new Punctuator() {
-            @Override
-            public void punctuate(long timestamp) {
-                node.processor().punctuate(timestamp);
-            }
-        };
-
         final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator);
         final long now = sched.timestamp - 50L;
 
@@ -103,42 +91,32 @@ public class PunctuationQueueTest {
         };
 
         queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(0, processor.punctuatedAt.size());
+        assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(0, processor.punctuatedAt.size());
+        assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(1, processor.punctuatedAt.size());
+        assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(1, processor.punctuatedAt.size());
+        assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(2, processor.punctuatedAt.size());
+        assertEquals(2, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(3, processor.punctuatedAt.size());
+        assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(3, processor.punctuatedAt.size());
+        assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(4, processor.punctuatedAt.size());
+        assertEquals(4, node.mockProcessor.punctuatedStreamTime.size());
     }
 
     @Test
     public void testPunctuationIntervalCancelFromPunctuator() {
-        final TestProcessor processor = new TestProcessor();
-        final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
-        final PunctuationQueue queue = new PunctuationQueue();
-        final Punctuator punctuator = new Punctuator() {
-            @Override
-            public void punctuate(long timestamp) {
-                node.processor().punctuate(timestamp);
-            }
-        };
-
         final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
         final long now = sched.timestamp - 100L;
 
@@ -154,35 +132,25 @@ public class PunctuationQueueTest {
         };
 
         queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(0, processor.punctuatedAt.size());
+        assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(1, processor.punctuatedAt.size());
+        assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
 
         queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
-        assertEquals(1, processor.punctuatedAt.size());
+        assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
     }
 
     private static class TestProcessor extends AbstractProcessor<String, String> {
 
-        public final ArrayList<Long> punctuatedAt = new ArrayList<>();
-
         @Override
-        public void init(ProcessorContext context) {
-        }
+        public void init(ProcessorContext context) {}
 
         @Override
-        public void process(String key, String value) {
-        }
+        public void process(String key, String value) {}
 
         @Override
-        public void punctuate(long streamTime) {
-            punctuatedAt.add(streamTime);
-        }
-
-        @Override
-        public void close() {
-        }
+        public void close() {}
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 598e47e..3a0fc4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -393,7 +393,7 @@ public class StreamTaskTest {
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
 
-        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
+        processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
     }
 
     @SuppressWarnings("unchecked")
@@ -479,7 +479,7 @@ public class StreamTaskTest {
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
 
-        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
+        processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
     }
 
     @SuppressWarnings("unchecked")
@@ -509,11 +509,11 @@ public class StreamTaskTest {
 
         assertTrue(task.process());
 
-        processorStreamTime.supplier.scheduleCancellable.cancel();
+        processorStreamTime.mockProcessor.scheduleCancellable.cancel();
 
         assertFalse(task.maybePunctuateStreamTime());
 
-        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
+        processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
     }
 
     @Test
@@ -533,7 +533,7 @@ public class StreamTaskTest {
         time.sleep(20);
         assertTrue(task.maybePunctuateSystemTime());
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50);
+        processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50);
     }
 
     @Test
@@ -544,7 +544,7 @@ public class StreamTaskTest {
         assertFalse(task.maybePunctuateSystemTime());
         time.sleep(9);
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+        processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
     }
 
     @Test
@@ -570,7 +570,7 @@ public class StreamTaskTest {
         time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation
         assertTrue(task.maybePunctuateSystemTime());
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
+        processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
     }
 
     @Test
@@ -581,10 +581,10 @@ public class StreamTaskTest {
         final long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.scheduleCancellable.cancel();
+        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
         time.sleep(10);
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);
+        processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3ae7acb..5bc1934 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -894,10 +894,6 @@ public class StreamThreadTest {
                     @Override
                     public void process(final Object key, final Object value) {}
 
-                    @SuppressWarnings("deprecation")
-                    @Override
-                    public void punctuate(final long timestamp) {}
-
                     @Override
                     public void close() {}
                 };
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 5e61910..27a0094 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -198,9 +198,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     }
 
     @Override
-    public void schedule(final long interval) { }
-
-    @Override
     public void commit() { }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index c93a306..cf4460d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -205,21 +205,6 @@ public class KStreamTestDriver extends ExternalResource {
         return topicNode;
     }
 
-    public void punctuate(final long timestamp) {
-        final ProcessorNode prevNode = context.currentNode();
-        for (final ProcessorNode processor : topology.processors()) {
-            if (processor.processor() != null) {
-                context.setRecordContext(createRecordContext(context.topic(), timestamp));
-                context.setCurrentNode(processor);
-                try {
-                    processor.processor().punctuate(timestamp);
-                } finally {
-                    context.setCurrentNode(prevNode);
-                }
-            }
-        }
-    }
-
     public void setTime(final long timestamp) {
         context.setTime(timestamp);
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
similarity index 53%
copy from streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
copy to streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index bdc8d40..927be0b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -18,9 +18,7 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 
@@ -28,7 +26,7 @@ import java.util.ArrayList;
 
 import static org.junit.Assert.assertEquals;
 
-public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
+public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
     public final ArrayList<String> processed = new ArrayList<>();
     public final ArrayList<K> processedKeys = new ArrayList<>();
@@ -37,67 +35,50 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
     public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
     public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
 
-    private final long scheduleInterval;
-    private final PunctuationType punctuationType;
     public Cancellable scheduleCancellable;
 
-    public MockProcessorSupplier() {
-        this(-1L);
-    }
-
-    public MockProcessorSupplier(long scheduleInterval) {
-        this(scheduleInterval, PunctuationType.STREAM_TIME);
-    }
+    private final PunctuationType punctuationType;
+    private final long scheduleInterval;
 
-    public MockProcessorSupplier(long scheduleInterval, PunctuationType punctuationType) {
-        this.scheduleInterval = scheduleInterval;
+    public MockProcessor(final PunctuationType punctuationType, final long scheduleInterval) {
         this.punctuationType = punctuationType;
+        this.scheduleInterval = scheduleInterval;
     }
 
-    @Override
-    public Processor<K, V> get() {
-        return new MockProcessor(punctuationType);
+    public MockProcessor() {
+        this(PunctuationType.STREAM_TIME, -1);
     }
 
-    public class MockProcessor extends AbstractProcessor<K, V> {
-
-        PunctuationType punctuationType;
-
-        public MockProcessor(PunctuationType punctuationType) {
-            this.punctuationType = punctuationType;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-            if (scheduleInterval > 0L) {
-                scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
-                    @Override
-                    public void punctuate(long timestamp) {
-                        if (punctuationType == PunctuationType.STREAM_TIME) {
-                            assertEquals(timestamp, context().timestamp());
-                        }
-                        assertEquals(-1, context().partition());
-                        assertEquals(-1L, context().offset());
-
-                        (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
-                           .add(timestamp);
+    @Override
+    public void init(final ProcessorContext context) {
+        super.init(context);
+        if (scheduleInterval > 0L) {
+            scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
+                @Override
+                public void punctuate(final long timestamp) {
+                    if (punctuationType == PunctuationType.STREAM_TIME) {
+                        assertEquals(timestamp, context().timestamp());
                     }
-                });
-            }
+                    assertEquals(-1, context().partition());
+                    assertEquals(-1L, context().offset());
+
+                    (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
+                            .add(timestamp);
+                }
+            });
         }
+    }
 
-        @Override
-        public void process(K key, V value) {
-            processedKeys.add(key);
-            processedValues.add(value);
-            processed.add((key == null ? "null" : key) + ":" +
-                    (value == null ? "null" : value));
+    @Override
+    public void process(final K key, final V value) {
+        processedKeys.add(key);
+        processedValues.add(value);
+        processed.add((key == null ? "null" : key) + ":" +
+                (value == null ? "null" : value));
 
-        }
     }
 
-    public void checkAndClearProcessResult(String... expected) {
+    public void checkAndClearProcessResult(final String... expected) {
         assertEquals("the number of outputs:" + processed, expected.length, processed.size());
         for (int i = 0; i < expected.length; i++) {
             assertEquals("output[" + i + "]:", expected[i], processed.get(i));
@@ -107,13 +88,12 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
     }
 
     public void checkEmptyAndClearProcessResult() {
-
         assertEquals("the number of outputs:", 0, processed.size());
         processed.clear();
     }
 
-    public void checkAndClearPunctuateResult(PunctuationType type, long... expected) {
-        ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
+    public void checkAndClearPunctuateResult(final PunctuationType type, final long... expected) {
+        final ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
         assertEquals("the number of outputs:", expected.length, punctuated.size());
 
         for (int i = 0; i < expected.length; i++) {
@@ -122,5 +102,4 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
 
         processed.clear();
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index a526bfd..094cb03 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
@@ -29,9 +28,9 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     private static final String NAME = "MOCK-PROCESS-";
     private static final AtomicInteger INDEX = new AtomicInteger(1);
 
-    public final MockProcessorSupplier<K, V> supplier;
+    public final MockProcessor<K, V> mockProcessor;
+
     public boolean closed;
-    public long punctuatedAt;
     public boolean initialized;
 
     public MockProcessorNode(long scheduleInterval) {
@@ -39,13 +38,17 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     public MockProcessorNode(long scheduleInterval, PunctuationType punctuationType) {
-        this(new MockProcessorSupplier<K, V>(scheduleInterval, punctuationType));
+        this(new MockProcessor<K, V>(punctuationType, scheduleInterval));
+    }
+
+    public MockProcessorNode() {
+        this(new MockProcessor<K, V>());
     }
 
-    private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
-        super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet());
+    private MockProcessorNode(final MockProcessor<K, V> mockProcessor) {
+        super(NAME + INDEX.getAndIncrement(), mockProcessor, Collections.<String>emptySet());
 
-        this.supplier = supplier;
+        this.mockProcessor = mockProcessor;
     }
 
     @Override
@@ -60,12 +63,6 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     @Override
-    public void punctuate(final long timestamp, final Punctuator punctuator) {
-        super.punctuate(timestamp, punctuator);
-        this.punctuatedAt = timestamp;
-    }
-
-    @Override
     public void close() {
         super.close();
         this.closed = true;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index bdc8d40..aec47a4 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -16,30 +16,20 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
 public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
 
-    public final ArrayList<String> processed = new ArrayList<>();
-    public final ArrayList<K> processedKeys = new ArrayList<>();
-    public final ArrayList<V> processedValues = new ArrayList<>();
-
-    public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
-    public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
-
     private final long scheduleInterval;
     private final PunctuationType punctuationType;
-    public Cancellable scheduleCancellable;
+    private final List<MockProcessor<K, V>> processors = new ArrayList<>();
 
     public MockProcessorSupplier() {
         this(-1L);
@@ -56,71 +46,20 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
 
     @Override
     public Processor<K, V> get() {
-        return new MockProcessor(punctuationType);
-    }
-
-    public class MockProcessor extends AbstractProcessor<K, V> {
-
-        PunctuationType punctuationType;
-
-        public MockProcessor(PunctuationType punctuationType) {
-            this.punctuationType = punctuationType;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-            if (scheduleInterval > 0L) {
-                scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
-                    @Override
-                    public void punctuate(long timestamp) {
-                        if (punctuationType == PunctuationType.STREAM_TIME) {
-                            assertEquals(timestamp, context().timestamp());
-                        }
-                        assertEquals(-1, context().partition());
-                        assertEquals(-1L, context().offset());
-
-                        (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
-                           .add(timestamp);
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void process(K key, V value) {
-            processedKeys.add(key);
-            processedValues.add(value);
-            processed.add((key == null ? "null" : key) + ":" +
-                    (value == null ? "null" : value));
-
-        }
-    }
-
-    public void checkAndClearProcessResult(String... expected) {
-        assertEquals("the number of outputs:" + processed, expected.length, processed.size());
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
-        }
-
-        processed.clear();
+        final MockProcessor<K, V> processor = new MockProcessor<>(punctuationType, scheduleInterval);
+        processors.add(processor);
+        return processor;
     }
 
-    public void checkEmptyAndClearProcessResult() {
-
-        assertEquals("the number of outputs:", 0, processed.size());
-        processed.clear();
+    // get the captured processor assuming that only one processor gets returned from this supplier
+    public MockProcessor<K, V> theCapturedProcessor() {
+        return capturedProcessors(1).get(0);
     }
 
-    public void checkAndClearPunctuateResult(PunctuationType type, long... expected) {
-        ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
-        assertEquals("the number of outputs:", expected.length, punctuated.size());
+    // get the captured processors with the expected number
+    public List<MockProcessor<K, V>> capturedProcessors(final int expectedNumberOfProcessors) {
+        assertEquals(expectedNumberOfProcessors, processors.size());
 
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
-        }
-
-        processed.clear();
+        return processors;
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 6b5d47a..e931c7e 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -57,10 +57,6 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public void schedule(final long interval) {
-    }
-
-    @Override
     public <K, V> void forward(final K key, final V value) {
         forwardedValues.put(key, value);
     }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 8f0ae93..4cee0ac 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -21,7 +21,6 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
-import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 94c36ad..d3ccaec 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -284,14 +284,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
 
           override def init(context: ProcessorContext): Unit = transformerSupplier.init(context)
 
-          @deprecated ("Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead", "0.1.3") // scalastyle:ignore
-          override def punctuate(timestamp: Long): KeyValue[K1, V1] = {
-            transformerSupplier.punctuate(timestamp) match {
-              case (k1, v1) => KeyValue.pair[K1, V1](k1, v1)
-              case _ => null
-            }
-          }
-
           override def close(): Unit = transformerSupplier.close()
         }
       }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 1aa1978..226192f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -23,7 +23,6 @@ package kstream
 import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
 import org.apache.kafka.streams.state.WindowStore
 import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 15b2da6..c387c36 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -361,14 +361,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         };
     }
 
-    @Override
-    public void schedule(final long interval) {
-        throw new UnsupportedOperationException(
-            "schedule() is deprecated and not supported in Mock. " +
-                "Use schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) instead."
-        );
-    }
-
     /**
      * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
      *
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 8c5ec46..934e043 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -361,10 +361,6 @@ public class MockProcessorContextTest {
             }
 
             @Override
-            public void punctuate(final long timestamp) {
-            }
-
-            @Override
             public void close() {
             }
         };
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 077b8ca..5259ef2 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -185,10 +185,6 @@ public class TopologyTestDriverTest {
         private boolean closed = false;
         private final List<Record> processedRecords = new ArrayList<>();
 
-        MockProcessor() {
-            this(Collections.<Punctuation>emptySet());
-        }
-
         MockProcessor(final Collection<Punctuation> punctuations) {
             this.punctuations = punctuations;
         }
@@ -208,10 +204,6 @@ public class TopologyTestDriverTest {
             context.forward(key, value);
         }
 
-        @SuppressWarnings("deprecation")
-        @Override
-        public void punctuate(long timestamp) {} // deprecated
-
         @Override
         public void close() {
             closed = true;
@@ -840,9 +832,6 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void punctuate(final long timestamp) {}
-
-        @Override
         public void close() {}
     }
 
@@ -870,9 +859,6 @@ public class TopologyTestDriverTest {
                         }
 
                         @Override
-                        public void punctuate(final long timestamp) {}
-
-                        @Override
                         public void close() {}
                     };
                 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.