You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/08/26 02:11:32 UTC

[kafka] branch kip-478-part-4 created (now eb55d70)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch kip-478-part-4
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at eb55d70  Convert test-utils and StateStore

This branch includes the following new commits:

     new eb55d70  Convert test-utils and StateStore

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: Convert test-utils and StateStore

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit eb55d70adadc0c692d29a4c23d039131ae2f1f80
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Aug 25 19:29:00 2020 -0500

    Convert test-utils and StateStore
---
 .../examples/wordcount/WordCountProcessorDemo.java |  21 +-
 .../examples/wordcount/WordCountProcessorTest.java |  10 +-
 .../apache/kafka/streams/processor/StateStore.java |  32 +-
 ...=> InternalProcessorContextReverseAdapter.java} |   6 +-
 .../processor/internals/ProcessorAdapter.java      |   2 +-
 .../internals/ProcessorContextAdapter.java         |   4 +-
 .../internals/ProcessorContextReverseAdapter.java  | 129 ++---
 .../internals/GlobalProcessorContextImplTest.java  |  11 +-
 .../internals/ProcessorContextImplTest.java        |   2 +-
 .../processor/api/MockProcessorContext.java        | 546 +++++++++++++++++++++
 .../kafka/streams/MockApiProcessorContextTest.java | 405 +++++++++++++++
 11 files changed, 1040 insertions(+), 128 deletions(-)

diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index c3f47da..a6a9b8a 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -54,17 +54,17 @@ import java.util.concurrent.CountDownLatch;
  */
 public final class WordCountProcessorDemo {
 
-    static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
+    static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
 
         @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
-                private ProcessorContext context;
+        public Processor<String, String, String, String> get() {
+            return new Processor<String, String, String, String>() {
+                private ProcessorContext<String, String> context;
                 private KeyValueStore<String, Integer> kvStore;
 
                 @Override
                 @SuppressWarnings("unchecked")
-                public void init(final ProcessorContext context) {
+                public void init(final ProcessorContext<String, String> context) {
                     this.context = context;
                     this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
                         try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
@@ -79,7 +79,7 @@ public final class WordCountProcessorDemo {
                             }
                         }
                     });
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+                    this.kvStore = context.getStateStore("Counts");
                 }
 
                 @Override
@@ -96,9 +96,6 @@ public final class WordCountProcessorDemo {
                         }
                     }
                 }
-
-                @Override
-                public void close() {}
             };
         }
     }
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index bec77e6..5ddda08 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.MockProcessorContext;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 public class WordCountProcessorTest {
     @Test
     public void test() {
-        final MockProcessorContext context = new MockProcessorContext();
+        final MockProcessorContext<String, String> context = new MockProcessorContext<>();
 
         // Create, initialize, and register the state store.
         final KeyValueStore<String, Integer> store =
@@ -48,7 +48,7 @@ public class WordCountProcessorTest {
         context.register(store, null);
 
         // Create and initialize the processor under test
-        final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
+        final Processor<String, String, String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
         processor.init(context);
 
         // send a record to the processor
@@ -61,7 +61,7 @@ public class WordCountProcessorTest {
         context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
 
         // finally, we can verify the output.
-        final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
+        final Iterator<MockProcessorContext.CapturedForward<String, String>> capturedForwards = context.forwarded().iterator();
         assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
         assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
         assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index df53ee2..d143f69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -49,6 +51,27 @@ public interface StateStore {
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the context via the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(ProcessorContext, org.apache.kafka.streams.processor.StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
+
+    /**
+     * Initializes this state store.
+     * <p>
+     * The implementation of this function must register the root store in the context via the
      * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
      * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
      * the second parameter should be an object of user's implementation
@@ -61,7 +84,14 @@ public interface StateStore {
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final ProcessorContext<?, ?> context, final StateStore root) {
+        final org.apache.kafka.streams.processor.ProcessorContext adapted =
+            ProcessorContextReverseAdapter.adapt(
+                context,
+                new ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder()
+            );
+        init(adapted, root);
+    }
 
     /**
      * Flush any cached data
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
similarity index 96%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
index 6e82a5e..11fc1f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
@@ -34,18 +34,18 @@ import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
+public final class InternalProcessorContextReverseAdapter implements InternalProcessorContext {
     private final InternalApiProcessorContext<Object, Object> delegate;
 
     static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
         if (delegate instanceof ProcessorContextAdapter) {
             return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
         } else {
-            return new ProcessorContextReverseAdapter(delegate);
+            return new InternalProcessorContextReverseAdapter(delegate);
         }
     }
 
-    private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
+    private InternalProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
         this.delegate = delegate;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index d8e4af4..3bbbe78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -47,7 +47,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext<KOut, VOut> context) {
-        delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+        delegate.init(InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
index 85dbd52..8d18ec7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
@@ -42,8 +42,8 @@ public final class ProcessorContextAdapter<KForward, VForward>
 
     @SuppressWarnings("unchecked")
     public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) {
-        if (delegate instanceof ProcessorContextReverseAdapter) {
-            return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
+        if (delegate instanceof InternalProcessorContextReverseAdapter) {
+            return (InternalApiProcessorContext<KForward, VForward>) ((InternalProcessorContextReverseAdapter) delegate).delegate();
         } else {
             return new ProcessorContextAdapter<>(delegate);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
index 6e82a5e..131b776 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
@@ -18,35 +18,52 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
+public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext {
+    private final ProcessorContext<Object, Object> delegate;
+    private final DeprecatedForwarder deprecatedForwarder;
 
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+    public interface DeprecatedForwarder {
+        <K, V> void forward(final K key, final V value, final int childIndex);
+    }
+
+    public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder {
+        @Override
+        public <K, V> void forward(final K key, final V value, final int childIndex) {
+            throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate,
+                                                                            final DeprecatedForwarder deprecatedForwarder) {
         if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+            return ((ProcessorContextAdapter<?, ?>) delegate).delegate();
+        } else if (delegate instanceof InternalApiProcessorContext) {
+            return InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) delegate);
         } else {
-            return new ProcessorContextReverseAdapter(delegate);
+            return new ProcessorContextReverseAdapter(delegate, deprecatedForwarder);
         }
     }
 
-    private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
-        this.delegate = delegate;
+    @SuppressWarnings("unchecked")
+    private ProcessorContextReverseAdapter(final ProcessorContext<?, ?> delegate,
+                                           final DeprecatedForwarder deprecatedForwarder) {
+        this.delegate = (ProcessorContext<Object, Object>) delegate;
+        this.deprecatedForwarder = deprecatedForwarder;
     }
 
     @Override
@@ -75,91 +92,11 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
     }
 
     @Override
-    public StreamsMetricsImpl metrics() {
+    public StreamsMetrics metrics() {
         return delegate.metrics();
     }
 
     @Override
-    public void setSystemTimeMs(final long timeMs) {
-        delegate.setSystemTimeMs(timeMs);
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return delegate.currentSystemTimeMs();
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return delegate.recordContext();
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        delegate.setRecordContext(recordContext);
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
-        delegate.setCurrentNode(currentNode);
-    }
-
-    @Override
-    public ProcessorNode<?, ?, ?, ?> currentNode() {
-        return delegate.currentNode();
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return delegate.cache();
-    }
-
-    @Override
-    public void initialize() {
-        delegate.initialize();
-    }
-
-    @Override
-    public void uninitialize() {
-        delegate.uninitialize();
-    }
-
-    @Override
-    public Task.TaskType taskType() {
-        return delegate.taskType();
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
-        delegate.transitionToActive(streamTask, recordCollector, newCache);
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-        delegate.transitionToStandby(newCache);
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
-        delegate.registerCacheFlushListener(namespace, listener);
-    }
-
-    @Override
-    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return delegate.getStateStore(builder);
-    }
-
-    @Override
-    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
-        delegate.logChange(storeName, key, value, timestamp);
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return delegate.changelogFor(storeName);
-    }
-
-    @Override
     public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
         delegate.register(store, stateRestoreCallback);
     }
@@ -176,7 +113,7 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
     }
 
     @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
+    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
         return delegate.schedule(interval, type, callback);
     }
 
@@ -193,7 +130,7 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
     @Deprecated
     @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
+        deprecatedForwarder.forward(key, value, childIndex);
     }
 
     @Deprecated
@@ -241,8 +178,4 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return delegate.appConfigsWithPrefix(prefix);
     }
-
-    InternalApiProcessorContext<Object, Object> delegate() {
-        return delegate;
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index ad8cd0a..e180010 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -142,7 +143,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -151,7 +152,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -160,7 +161,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -169,7 +170,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -178,7 +179,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForSessionStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index f4b62c3d..ab88efa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -767,7 +767,7 @@ public class ProcessorContextImplTest {
         assertTrue(store.persistent());
         assertTrue(store.isOpen());
 
-        checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+        checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
         checkThrowsUnsupportedOperation(store::close, "close()");
     }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
new file mode 100644
index 0000000..acad94d
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private String topic;
+    private Integer partition;
+    private Long offset;
+    private Headers headers;
+    private Long timestamp;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<KForward, VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final long intervalMs;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) {
+            this.intervalMs = intervalMs;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        @SuppressWarnings("unused")
+        public long getIntervalMs() {
+            return intervalMs;
+        }
+
+        @SuppressWarnings("unused")
+        public PunctuationType getType() {
+            return type;
+        }
+
+        @SuppressWarnings("unused")
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        @SuppressWarnings({"WeakerAccess", "unused"})
+        public void cancel() {
+            cancelled = true;
+        }
+
+        @SuppressWarnings("unused")
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<KForward, VForward> {
+        private final String childName;
+        private final long timestamp;
+        private final KeyValue<KForward, VForward> keyValue;
+
+        private CapturedForward(final To to, final KeyValue<KForward, VForward> keyValue) {
+            if (keyValue == null) {
+                throw new IllegalArgumentException("keyValue can't be null");
+            }
+
+            try {
+                final Field field = To.class.getDeclaredField("childName");
+                field.setAccessible(true);
+                childName = (String) field.get(to);
+            } catch (final IllegalAccessException | NoSuchFieldException e) {
+                throw new RuntimeException(e);
+            }
+            timestamp = getTimestamp(to);
+
+            this.keyValue = keyValue;
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return The child name, or {@code null} if it was broadcast.
+         */
+        @SuppressWarnings("unused")
+        public String childName() {
+            return childName;
+        }
+
+        /**
+         * The timestamp attached to the forwarded record.
+         *
+         * @return A timestamp, or {@code -1} if none was forwarded.
+         */
+        @SuppressWarnings("unused")
+        public long timestamp() {
+            return timestamp;
+        }
+
+        /**
+         * The data forwarded.
+         *
+         * @return A key/value pair. Not null.
+         */
+        @SuppressWarnings("unused")
+        public KeyValue<KForward, VForward> keyValue() {
+            return keyValue;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "childName='" + childName + '\'' +
+                ", timestamp=" + timestamp +
+                ", keyValue=" + keyValue +
+                '}';
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    @SuppressWarnings("unused")
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    @SuppressWarnings("unused")
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    @SuppressWarnings("unused")
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return stateDir;
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     * @param timestamp A record timestamp
+     */
+    @SuppressWarnings("unused")
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset,
+                                  final Headers headers,
+                                  final long timestamp) {
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.headers = headers;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param topic A topic name
+     */
+    @SuppressWarnings("unused")
+    public void setTopic(final String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param partition A partition number
+     */
+    @SuppressWarnings("unused")
+    public void setPartition(final int partition) {
+        this.partition = partition;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param offset A record offset
+     */
+    @SuppressWarnings("unused")
+    public void setOffset(final long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param headers Record headers
+     */
+    @SuppressWarnings("unused")
+    public void setHeaders(final Headers headers) {
+        this.headers = headers;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
+     *
+     * @param timestamp A record timestamp
+     */
+    @SuppressWarnings("unused")
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String topic() {
+        if (topic == null) {
+            throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic().");
+        }
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        if (partition == null) {
+            throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition().");
+        }
+        return partition;
+    }
+
+    @Override
+    public long offset() {
+        if (offset == null) {
+            throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset().");
+        }
+        return offset;
+    }
+
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
+    public long timestamp() {
+        if (timestamp == null) {
+            throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
+        }
+        return timestamp;
+    }
+
+    // mocks ================================================
+
+    @Override
+    public void register(final StateStore store,
+                         final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
+        stateStores.put(store.name(), store);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator =
+            new CapturedPunctuator(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    @SuppressWarnings("unused")
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
+        forward(key, value, To.all());
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
+        capturedForwards.add(
+            new CapturedForward<>(
+                (getTimestamp(to)) == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to,
+                new KeyValue<>(key, value)
+            )
+        );
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of key/value pairs that were previously passed to the context.
+     */
+    public List<CapturedForward<KForward, VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of key/value pairs that were previously passed to the context.
+     */
+    @SuppressWarnings("unused")
+    public List<CapturedForward<KForward, VForward>> forwarded(final String childName) {
+        final LinkedList<CapturedForward<KForward, VForward>> result = new LinkedList<>();
+        for (final CapturedForward<KForward, VForward> capture : capturedForwards) {
+            if (capture.childName() == null || capture.childName().equals(childName)) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    @SuppressWarnings("unused")
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    @SuppressWarnings("unused")
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
+    }
+
+    private static long getTimestamp(final To to) {
+        final long timestamp;
+        try {
+            final Field field = To.class.getDeclaredField("timestamp");
+            field.setAccessible(true);
+            timestamp = (long) field.get(to);
+        } catch (final IllegalAccessException | NoSuchFieldException e) {
+            throw new RuntimeException(e);
+        }
+        return timestamp;
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
new file mode 100644
index 0000000..6e9c5a6
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MockApiProcessorContextTest {
+    @Test
+    public void shouldCaptureOutputRecords() {
+        final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+            private ProcessorContext<String, Long> context;
+
+            @Override
+            public void init(final ProcessorContext<String, Long> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+                context.forward(key + value, key.length() + value);
+            }
+        };
+
+        final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+        assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+        assertFalse(forwarded.hasNext());
+
+        context.resetForwards();
+
+        assertEquals(0, context.forwarded().size());
+    }
+
+    @Test
+    public void shouldCaptureOutputRecordsUsingTo() {
+        final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+            private ProcessorContext<String, Long> context;
+
+            @Override
+            public void init(final ProcessorContext<String, Long> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+                context.forward(key + value, key.length() + value, To.all());
+            }
+        };
+
+        final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator();
+        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+        assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+        assertFalse(forwarded.hasNext());
+
+        context.resetForwards();
+
+        assertEquals(0, context.forwarded().size());
+    }
+
+    @Test
+    public void shouldCaptureRecordsOutputToChildByName() {
+        final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+            private int count = 0;
+            private ProcessorContext<String, Long> context;
+
+            @Override
+            public void init(final ProcessorContext<String, Long> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+                if (count == 0) {
+                    context.forward("start", -1L, To.all()); // broadcast
+                }
+                final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete");
+                context.forward(key + value, key.length() + value, toChild);
+                count++;
+            }
+        };
+
+        final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        {
+            final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator();
+
+            final CapturedForward forward1 = forwarded.next();
+            assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
+            assertNull(forward1.childName());
+
+            final CapturedForward forward2 = forwarded.next();
+            assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
+            assertEquals("george", forward2.childName());
+
+            final CapturedForward forward3 = forwarded.next();
+            assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
+            assertEquals("pete", forward3.childName());
+
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("george").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("pete").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+
+        {
+            final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("steve").iterator();
+            assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
+            assertFalse(forwarded.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldCaptureCommitsAndAllowReset() {
+        final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+            private int count = 0;
+            private ProcessorContext<Void, Void> context;
+
+            @Override
+            public void init(final ProcessorContext<Void, Void> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+                if (++count > 2) {
+                    context.commit();
+                }
+            }
+        };
+
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("barbaz", 50L);
+
+        assertFalse(context.committed());
+
+        processor.process("foobar", 500L);
+
+        assertTrue(context.committed());
+
+        context.resetCommit();
+
+        assertFalse(context.committed());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldStoreAndReturnStateStores() {
+        final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+            private ProcessorContext<Void, Void> context;
+
+            @Override
+            public void init(final ProcessorContext<Void, Void> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+                final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state");
+                stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
+                stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
+            }
+        };
+
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("my-state"),
+                Serdes.String(),
+                Serdes.Long()).withLoggingDisabled();
+
+        final KeyValueStore<String, Long> store = storeBuilder.build();
+
+        store.init(ProcessorContextReverseAdapter.adapt(context, new ProcessorContextReverseAdapter.DeprecatedForwarder() {
+            @Override
+            public <K, V> void forward(final K key, final V value, final int childIndex) {
+                throw new UnsupportedOperationException();
+            }
+        }), store);
+
+        processor.init(context);
+
+        processor.process("foo", 5L);
+        processor.process("bar", 50L);
+
+        assertEquals(5L, (long) store.get("foo"));
+        assertEquals(50L, (long) store.get("bar"));
+        assertEquals(55L, (long) store.get("all"));
+    }
+
+    @Test
+    public void shouldCaptureApplicationAndRecordMetadata() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+
+        final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() {
+            private ProcessorContext<String, Object> context;
+
+            @Override
+            public void init(final ProcessorContext<String, Object> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final String key, final Object value) {
+                context.forward("appId", context.applicationId());
+                context.forward("taskId", context.taskId());
+
+                context.forward("topic", context.topic());
+                context.forward("partition", context.partition());
+                context.forward("offset", context.offset());
+                context.forward("timestamp", context.timestamp());
+
+                context.forward("key", key);
+                context.forward("value", value);
+            }
+        };
+
+        final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config);
+        processor.init(context);
+
+        try {
+            processor.process("foo", 5L);
+            fail("Should have thrown an exception.");
+        } catch (final IllegalStateException expected) {
+            // expected, since the record metadata isn't initialized
+        }
+
+        context.resetForwards();
+        context.setRecordMetadata("t1", 0, 0L, null, 0L);
+
+        {
+            processor.process("foo", 5L);
+            final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue());
+        }
+
+        context.resetForwards();
+
+        // record metadata should be "sticky"
+        context.setOffset(1L);
+        context.setTimestamp(10L);
+
+        {
+            processor.process("bar", 50L);
+            final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue());
+        }
+
+        context.resetForwards();
+        // record metadata should be "sticky"
+        context.setTopic("t2");
+        context.setPartition(30);
+
+        {
+            processor.process("baz", 500L);
+            final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator();
+            assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue());
+            assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue());
+        }
+    }
+
+    @Test
+    public void shouldCapturePunctuator() {
+        final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+            @Override
+            public void init(final ProcessorContext<Void, Void> context) {
+                context.schedule(
+                    Duration.ofSeconds(1L),
+                    PunctuationType.WALL_CLOCK_TIME,
+                    timestamp -> context.commit()
+                );
+            }
+
+            @Override
+            public void process(final String key, final Long value) {
+            }
+        };
+
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+        processor.init(context);
+
+        final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+        assertEquals(1000L, capturedPunctuator.getIntervalMs());
+        assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
+        assertFalse(capturedPunctuator.cancelled());
+
+        final Punctuator punctuator = capturedPunctuator.getPunctuator();
+        assertFalse(context.committed());
+        punctuator.punctuate(1234L);
+        assertTrue(context.committed());
+    }
+
+    @Test
+    public void fullConstructorShouldSetAllExpectedAttributes() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+
+        final File dummyFile = new File("");
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile);
+
+        assertEquals("testFullConstructor", context.applicationId());
+        assertEquals(new TaskId(1, 1), context.taskId());
+        assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
+        assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
+        assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
+        assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
+        assertEquals(dummyFile, context.stateDir());
+    }
+}