You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/08 16:31:30 UTC

[kafka] 01/01: KAFKA-10437: Implement new PAPI support for test-utils

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-6-test-utils
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 178eda5dd6952a851d9786ded4a99bafa10ddd9d
Author: John Roesler <vv...@apache.org>
AuthorDate: Wed Oct 7 21:40:24 2020 -0500

    KAFKA-10437: Implement new PAPI support for test-utils
---
 .../apache/kafka/streams/processor/api/Record.java |  28 ++
 .../state/internals/InMemorySessionStore.java      |  30 +-
 .../processor/api/MockProcessorContext.java        | 491 +++++++++++++++++++++
 .../kafka/streams/MockProcessorContextTest.java    |   8 +-
 .../streams/test/MockProcessorContextAPITest.java  | 352 +++++++++++++++
 .../test/MockProcessorContextStateStoreTest.java   | 200 +++++++++
 6 files changed, 1093 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
index 652a647..ab8844c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.errors.StreamsException;
 
+import java.util.Objects;
+
 /**
  * A data class representing an incoming record for processing in a {@link Processor}
  * or a record to forward to downstream processors via {@link ProcessorContext}.
@@ -162,4 +164,30 @@ public class Record<K, V> {
     public Record<K, V> withHeaders(final Headers headers) {
         return new Record<>(key, value, timestamp, headers);
     }
+
+    @Override
+    public String toString() {
+        return "Record{" +
+            "key=" + key +
+            ", value=" + value +
+            ", timestamp=" + timestamp +
+            ", headers=" + headers +
+            '}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final Record<?, ?> record = (Record<?, ?>) o;
+        return timestamp == record.timestamp &&
+            Objects.equals(key, record.key) &&
+            Objects.equals(value, record.value) &&
+            Objects.equals(headers, record.headers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, value, timestamp, headers);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 46c4de2..f08f9d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -75,18 +75,22 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
     @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        this.context = (InternalProcessorContext) context;
-
-        final StreamsMetricsImpl metrics = this.context.metrics();
         final String threadId = Thread.currentThread().getName();
         final String taskName = context.taskId().toString();
-        expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
-            threadId,
-            taskName,
-            metricScope,
-            name,
-            metrics
-        );
+
+        if (context instanceof InternalProcessorContext) {
+            this.context = (InternalProcessorContext) context;
+            final StreamsMetricsImpl metrics = this.context.metrics();
+            expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
+                threadId,
+                taskName,
+                metricScope,
+                name,
+                metrics
+            );
+        } else {
+            this.context = null;
+        }
 
         if (root != null) {
             context.register(root, (key, value) -> put(SessionKeySchema.from(Bytes.wrap(key)), value));
@@ -102,7 +106,11 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
         observedStreamTime = Math.max(observedStreamTime, windowEndTimestamp);
 
         if (windowEndTimestamp <= observedStreamTime - retentionPeriod) {
-            expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
+            // The provided context is not required to implement InternalProcessorContext,
+            // If it doesn't, we can't record this metric.
+            if (context != null) {
+                expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
+            }
             LOG.warn("Skipping record for expired segment.");
         } else {
             if (aggregate != null) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
new file mode 100644
index 0000000..8dfa375
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     */
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset) {
+        recordMetadata = new MockRecordMetadata(topic, partition, offset);
+    }
+
+    @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordMetadata);
+    }
+
+    // mocks ================================================
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    public <S extends StateStore> void addStateStore(final S stateStore) {
+        stateStores.put(stateStore.name(), stateStore);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
+        capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
+        final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
+        for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
+            if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
+    }
+
+    /**
+     * Used to get a {@link StateStoreContext} for use with
+     * {@link StateStore#init(StateStoreContext, StateStore)}
+     * if you need to initialize a store for your tests.
+     * @return a {@link StateStoreContext} that delegates to this ProcessorContext.
+     */
+    public StateStoreContext getStateStoreContext() {
+        return new StateStoreContext() {
+            @Override
+            public String applicationId() {
+                return MockProcessorContext.this.applicationId();
+            }
+
+            @Override
+            public TaskId taskId() {
+                return MockProcessorContext.this.taskId();
+            }
+
+            @Override
+            public Serde<?> keySerde() {
+                return MockProcessorContext.this.keySerde();
+            }
+
+            @Override
+            public Serde<?> valueSerde() {
+                return MockProcessorContext.this.valueSerde();
+            }
+
+            @Override
+            public File stateDir() {
+                return MockProcessorContext.this.stateDir();
+            }
+
+            @Override
+            public StreamsMetrics metrics() {
+                return MockProcessorContext.this.metrics();
+            }
+
+            @Override
+            public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+                stateStores.put(store.name(), store);
+            }
+
+            @Override
+            public Map<String, Object> appConfigs() {
+                return MockProcessorContext.this.appConfigs();
+            }
+
+            @Override
+            public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+                return MockProcessorContext.this.appConfigsWithPrefix(prefix);
+            }
+        };
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 6e2f4ed..85d0a8b 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -43,6 +43,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation") // this is a test of a deprecated API
 public class MockProcessorContextTest {
     @Test
     public void shouldCaptureOutputRecords() {
@@ -160,7 +161,6 @@ public class MockProcessorContextTest {
     @Test
     public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
-            @SuppressWarnings("deprecation")
             @Override
             public void process(final String key, final Long value) {
                 context().forward(key, value, 0);
@@ -182,7 +182,6 @@ public class MockProcessorContextTest {
     @Test
     public void shouldThrowIfForwardedWithDeprecatedChildName() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
-            @SuppressWarnings("deprecation")
             @Override
             public void process(final String key, final Long value) {
                 context().forward(key, value, "child1");
@@ -231,14 +230,13 @@ public class MockProcessorContextTest {
 
         assertFalse(context.committed());
     }
-    @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437
+
     @Test
     public void shouldStoreAndReturnStateStores() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
             @Override
             public void process(final String key, final Long value) {
-                @SuppressWarnings("unchecked")
-                final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
+                final KeyValueStore<String, Long> stateStore = context().getStateStore("my-state");
                 stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
                 stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
             }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
new file mode 100644
index 0000000..074acb7
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockProcessorContextAPITest {
+    @Test
+    public void shouldCaptureOutputRecords() {
+        final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+            private ProcessorContext<String, Long> context;
+
+            @Override
+            public void init(final ProcessorContext<String, Long> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final Record<String, Long> record) {
+                final String key = record.key();
+                final Long value = record.value();
+                context.forward(record.withKey(key + value).withValue(key.length() + value));
+            }
+        };
+
+        final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        processor.process(new Record<>("foo", 5L, 0L));
+        processor.process(new Record<>("barbaz", 50L, 0L));
+
+        final List<CapturedForward<? extends String, ? extends Long>> actual = context.forwarded();
+        final List<CapturedForward<String, Long>> expected = asList(
+            new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.empty()),
+            new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.empty())
+        );
+        assertThat(actual, is(expected));
+
+        context.resetForwards();
+
+        assertThat(context.forwarded(), empty());
+    }
+
+    @Test
+    public void shouldCaptureRecordsOutputToChildByName() {
+        final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
+            private ProcessorContext<String, Long> context;
+
+            @Override
+            public void process(final Record<String, Long> record) {
+                final String key = record.key();
+                final Long value = record.value();
+                if (count == 0) {
+                    context.forward(new Record<>("start", -1L, 0L)); // broadcast
+                }
+                final String toChild = count % 2 == 0 ? "george" : "pete";
+                context.forward(new Record<>(key + value, key.length() + value, 0L), toChild);
+                count++;
+            }
+
+            @Override
+            public void init(final ProcessorContext<String, Long> context) {
+                this.context = context;
+            }
+
+            private int count = 0;
+
+        };
+
+        final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
+
+        processor.init(context);
+
+        processor.process(new Record<>("foo", 5L, 0L));
+        processor.process(new Record<>("barbaz", 50L, 0L));
+
+        {
+            final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded();
+            final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+                new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.of("george")),
+                new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.of("pete"))
+            );
+
+            assertThat(forwarded, is(expected));
+        }
+        {
+            final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("george");
+            final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+                new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.of("george"))
+            );
+
+            assertThat(forwarded, is(expected));
+        }
+        {
+            final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("pete");
+            final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+                new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.of("pete"))
+            );
+
+            assertThat(forwarded, is(expected));
+        }
+        {
+            final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("steve");
+            final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
+                new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty())
+            );
+
+            assertThat(forwarded, is(expected));
+        }
+    }
+
+    @Test
+    public void shouldCaptureCommitsAndAllowReset() {
+        final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+            private ProcessorContext<Void, Void> context;
+            private int count = 0;
+
+            @Override
+            public void init(final ProcessorContext<Void, Void> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final Record<String, Long> record) {
+                if (++count > 2) {
+                    context.commit();
+                }
+            }
+        };
+
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+        processor.init(context);
+
+        processor.process(new Record<>("foo", 5L, 0L));
+        processor.process(new Record<>("barbaz", 50L, 0L));
+
+        assertThat(context.committed(), is(false));
+
+        processor.process(new Record<>("foobar", 500L, 0L));
+
+        assertThat(context.committed(), is(true));
+
+        context.resetCommit();
+
+        assertThat(context.committed(), is(false));
+    }
+
+    @Test
+    public void shouldStoreAndReturnStateStores() {
+        final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+            private ProcessorContext<Void, Void> context;
+
+            @Override
+            public void init(final ProcessorContext<Void, Void> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final Record<String, Long> record) {
+                final String key = record.key();
+                final Long value = record.value();
+                final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state");
+
+                stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
+                stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
+            }
+
+        };
+
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
+
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("my-state"),
+            Serdes.String(),
+            Serdes.Long()).withLoggingDisabled();
+
+        final KeyValueStore<String, Long> store = storeBuilder.build();
+
+        store.init(context.getStateStoreContext(), store);
+
+        processor.init(context);
+
+        processor.process(new Record<>("foo", 5L, 0L));
+        processor.process(new Record<>("bar", 50L, 0L));
+
+        assertThat(store.get("foo"), is(5L));
+        assertThat(store.get("bar"), is(50L));
+        assertThat(store.get("all"), is(55L));
+    }
+
+
+    @Test
+    public void shouldCaptureApplicationAndRecordMetadata() {
+        final Properties config = mkProperties(
+            mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )
+        );
+
+        final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() {
+            private ProcessorContext<String, Object> context;
+
+            @Override
+            public void init(final ProcessorContext<String, Object> context) {
+                this.context = context;
+            }
+
+            @Override
+            public void process(final Record<String, Object> record) {
+                context.forward(new Record<String, Object>("appId", context.applicationId(), 0L));
+                context.forward(new Record<String, Object>("taskId", context.taskId(), 0L));
+
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    context.forward(new Record<String, Object>("topic", recordMetadata.topic(), 0L));
+                    context.forward(new Record<String, Object>("partition", recordMetadata.partition(), 0L));
+                    context.forward(new Record<String, Object>("offset", recordMetadata.offset(), 0L));
+                }
+
+                context.forward(new Record<String, Object>("record", record, 0L));
+            }
+        };
+
+        final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config);
+        processor.init(context);
+
+        processor.process(new Record<>("foo", 5L, 0L));
+        {
+            final List<CapturedForward<? extends String, ?>> forwarded = context.forwarded();
+            final List<CapturedForward<? extends String, ?>> expected = asList(
+                new CapturedForward<>(new Record<>("appId", "testMetadata", 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("taskId", new TaskId(0, 0), 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("record", new Record<>("foo", 5L, 0L), 0L), Optional.empty())
+            );
+            assertThat(forwarded, is(expected));
+        }
+        context.resetForwards();
+        context.setRecordMetadata("t1", 0, 0L);
+        processor.process(new Record<>("foo", 5L, 0L));
+        {
+            final List<CapturedForward<? extends String, ?>> forwarded = context.forwarded();
+            final List<CapturedForward<? extends String, ?>> expected = asList(
+                new CapturedForward<>(new Record<>("appId", "testMetadata", 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("taskId", new TaskId(0, 0), 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("topic", "t1", 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("partition", 0, 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("offset", 0L, 0L), Optional.empty()),
+                new CapturedForward<>(new Record<>("record", new Record<>("foo", 5L, 0L), 0L), Optional.empty())
+            );
+            assertThat(forwarded, is(expected));
+        }
+    }
+
+    @Test
+    public void shouldCapturePunctuator() {
+        final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
+            @Override
+            public void init(final ProcessorContext context) {
+                context.schedule(
+                    Duration.ofSeconds(1L),
+                    PunctuationType.WALL_CLOCK_TIME,
+                    timestamp -> context.commit()
+                );
+            }
+
+            @Override
+            public void process(final Record<String, Long> record) {}
+        };
+
+        final MockProcessorContext<Void, Void> context = new MockProcessorContext<Void, Void>();
+
+        processor.init(context);
+
+        final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
+        assertThat(capturedPunctuator.getInterval(), is(Duration.ofMillis(1000L)));
+        assertThat(capturedPunctuator.getType(), is(PunctuationType.WALL_CLOCK_TIME));
+        assertThat(capturedPunctuator.cancelled(), is(false));
+
+        final Punctuator punctuator = capturedPunctuator.getPunctuator();
+        assertThat(context.committed(), is(false));
+        punctuator.punctuate(1234L);
+        assertThat(context.committed(), is(true));
+    }
+
+    @Test
+    public void fullConstructorShouldSetAllExpectedAttributes() {
+        final Properties config = new Properties();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+
+        final File dummyFile = new File("");
+        final MockProcessorContext<Void, Void> context =
+            new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile);
+
+        assertThat(context.applicationId(), is("testFullConstructor"));
+        assertThat(context.taskId(), is(new TaskId(1, 1)));
+        assertThat(context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG), is("testFullConstructor"));
+        assertThat(context.appConfigsWithPrefix("application.").get("id"), is("testFullConstructor"));
+        assertThat(context.keySerde().getClass(), is(Serdes.String().getClass()));
+        assertThat(context.valueSerde().getClass(), is(Serdes.Long().getClass()));
+        assertThat(context.stateDir(), is(dummyFile));
+    }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java
new file mode 100644
index 0000000..4662ddf
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(value = Parameterized.class)
+public class MockProcessorContextStateStoreTest {
+    private final StoreBuilder<StateStore> builder;
+    private final boolean timestamped;
+    private final boolean caching;
+    private final boolean logging;
+
+    @Parameterized.Parameters(name = "builder = {0}, timestamped = {1}, caching = {2}, logging = {3}")
+    public static Collection<Object[]> data() {
+        final List<Boolean> booleans = asList(true, false);
+
+        final List<Object[]> values = new ArrayList<>();
+
+        for (final Boolean timestamped : booleans) {
+            for (final Boolean caching : booleans) {
+                for (final Boolean logging : booleans) {
+                    final List<KeyValueBytesStoreSupplier> keyValueBytesStoreSuppliers = asList(
+                        Stores.inMemoryKeyValueStore("kv" + timestamped + caching + logging),
+                        Stores.persistentKeyValueStore("kv" + timestamped + caching + logging),
+                        Stores.persistentTimestampedKeyValueStore("kv" + timestamped + caching + logging)
+                    );
+                    for (final KeyValueBytesStoreSupplier supplier : keyValueBytesStoreSuppliers) {
+                        final StoreBuilder<? extends KeyValueStore<String, ?>> builder;
+                        if (timestamped) {
+                            builder = Stores.timestampedKeyValueStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+                        } else {
+                            builder = Stores.keyValueStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+                        }
+                        if (caching) {
+                            builder.withCachingEnabled();
+                        } else {
+                            builder.withCachingDisabled();
+                        }
+                        if (logging) {
+                            builder.withLoggingEnabled(Collections.emptyMap());
+                        } else {
+                            builder.withLoggingDisabled();
+                        }
+
+                        values.add(new Object[] {builder, timestamped, caching, logging});
+                    }
+                }
+            }
+        }
+
+        for (final Boolean timestamped : booleans) {
+            for (final Boolean caching : booleans) {
+                for (final Boolean logging : booleans) {
+                    final List<WindowBytesStoreSupplier> windowBytesStoreSuppliers = asList(
+                        Stores.inMemoryWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
+                        Stores.persistentWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
+                        Stores.persistentTimestampedWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false)
+                    );
+
+                    for (final WindowBytesStoreSupplier supplier : windowBytesStoreSuppliers) {
+                        final StoreBuilder<? extends WindowStore<String, ?>> builder;
+                        if (timestamped) {
+                            builder = Stores.timestampedWindowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+                        } else {
+                            builder = Stores.windowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+                        }
+                        if (caching) {
+                            builder.withCachingEnabled();
+                        } else {
+                            builder.withCachingDisabled();
+                        }
+                        if (logging) {
+                            builder.withLoggingEnabled(Collections.emptyMap());
+                        } else {
+                            builder.withLoggingDisabled();
+                        }
+
+                        values.add(new Object[] {builder, timestamped, caching, logging});
+                    }
+                }
+            }
+        }
+
+        for (final Boolean caching : booleans) {
+            for (final Boolean logging : booleans) {
+                final List<SessionBytesStoreSupplier> sessionBytesStoreSuppliers = asList(
+                    Stores.inMemorySessionStore("s" + caching + logging, Duration.ofSeconds(1)),
+                    Stores.persistentSessionStore("s" + caching + logging, Duration.ofSeconds(1))
+                );
+
+                for (final SessionBytesStoreSupplier supplier : sessionBytesStoreSuppliers) {
+                    final StoreBuilder<? extends SessionStore<String, ?>> builder =
+                        Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long());
+                    if (caching) {
+                        builder.withCachingEnabled();
+                    } else {
+                        builder.withCachingDisabled();
+                    }
+                    if (logging) {
+                        builder.withLoggingEnabled(Collections.emptyMap());
+                    } else {
+                        builder.withLoggingDisabled();
+                    }
+
+                    values.add(new Object[] {builder, false, caching, logging});
+                }
+            }
+        }
+
+        return values;
+    }
+
+    public MockProcessorContextStateStoreTest(final StoreBuilder<StateStore> builder,
+                                              final boolean timestamped,
+                                              final boolean caching,
+                                              final boolean logging) {
+
+        this.builder = builder;
+        this.timestamped = timestamped;
+        this.caching = caching;
+        this.logging = logging;
+    }
+
+    @Test
+    public void shouldEitherInitOrThrow() {
+        final File stateDir = TestUtils.tempDirectory();
+        try {
+            final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+                )),
+                new TaskId(0, 0),
+                stateDir
+            );
+            final StateStore store = builder.build();
+            if (caching || logging) {
+                assertThrows(
+                    IllegalArgumentException.class,
+                    () -> store.init(context.getStateStoreContext(), store)
+                );
+            } else {
+                store.init(context.getStateStoreContext(), store);
+                store.close();
+            }
+        } finally {
+            try {
+                Utils.delete(stateDir);
+            } catch (final IOException e) {
+                // Failed to clean up the state dir. The JVM hooks will try again later.
+            }
+        }
+    }
+}