You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/02 23:50:31 UTC

[kafka] branch trunk updated: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts (#9361)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69790a1  KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts (#9361)
69790a1 is described below

commit 69790a1463bafc1f63e3c288a6636b3f4586c1b4
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Oct 2 18:49:12 2020 -0500

    KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts (#9361)
    
    Migrate different components of the old ProcessorContext interface
    into separate interfaces that are more appropriate for their usages.
    See KIP-478 for the details.
    
    Reviewers: Guozhang Wang <gu...@apache.org>, Paul Whalen <pg...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../examples/docs/DeveloperGuideTesting.java       |  23 +-
 .../kafka/streams/processor/ProcessorContext.java  |   5 +-
 .../apache/kafka/streams/processor/StateStore.java |  31 ++-
 .../kafka/streams/processor/StateStoreContext.java | 112 ++++++++++
 .../kafka/streams/processor/api/Processor.java     |   7 +-
 .../streams/processor/api/ProcessorContext.java    | 173 +++++++-------
 .../apache/kafka/streams/processor/api/Record.java | 165 ++++++++++++++
 .../ToInternal.java => api/RecordMetadata.java}    |  37 ++-
 .../internals/AbstractProcessorContext.java        |  72 +++---
 .../ForwardingDisabledProcessorContext.java        |   2 +-
 .../internals/GlobalProcessorContextImpl.java      |  27 ++-
 .../internals/GlobalStateManagerImpl.java          |   3 +-
 .../processor/internals/GlobalStateUpdateTask.java |   9 +-
 .../internals/InternalApiProcessorContext.java     | 119 ----------
 .../internals/InternalProcessorContext.java        |   5 +-
 .../processor/internals/ProcessorAdapter.java      |  25 ++-
 .../internals/ProcessorContextAdapter.java         | 235 -------------------
 .../processor/internals/ProcessorContextImpl.java  | 105 ++++++---
 .../internals/ProcessorContextReverseAdapter.java  | 248 ---------------------
 .../streams/processor/internals/ProcessorNode.java |  15 +-
 .../internals/ProcessorRecordContext.java          |   3 +-
 .../processor/internals/ProcessorStateManager.java |   3 +-
 .../streams/processor/internals/SinkNode.java      |  26 ++-
 .../streams/processor/internals/SourceNode.java    |   5 +-
 ...xt.java => StoreToProcessorContextAdapter.java} |  70 +++---
 .../streams/processor/internals/StreamTask.java    |  37 ++-
 .../streams/processor/internals/ToInternal.java    |   4 +
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  13 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |   5 +-
 .../org/apache/kafka/streams/TopologyTest.java     |   8 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   5 +-
 .../kstream/internals/KStreamTransformTest.java    |   5 +-
 .../KTableSuppressProcessorMetricsTest.java        |   3 +-
 .../suppress/KTableSuppressProcessorTest.java      |   3 +-
 .../internals/AbstractProcessorContextTest.java    |  46 ++--
 .../internals/GlobalProcessorContextImplTest.java  |  19 +-
 .../processor/internals/GlobalStateTaskTest.java   |   9 +-
 .../internals/GlobalStreamThreadTest.java          |   4 +-
 .../internals/ProcessorContextImplTest.java        |   2 +-
 .../processor/internals/ProcessorNodeTest.java     |   8 +-
 .../internals/ProcessorStateManagerTest.java       |   5 +-
 .../processor/internals/ProcessorTopologyTest.java |  36 +--
 .../streams/processor/internals/SinkNodeTest.java  |   3 +-
 .../processor/internals/StreamTaskTest.java        |  14 +-
 .../processor/internals/StreamThreadTest.java      |   4 +-
 .../internals/testutil/ConsumerRecordUtil.java     |  46 ++++
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  13 +-
 .../internals/AbstractSessionBytesStoreTest.java   |   9 +-
 .../internals/AbstractWindowBytesStoreTest.java    |  13 +-
 .../state/internals/CachingKeyValueStoreTest.java  |   5 +-
 .../state/internals/CachingSessionStoreTest.java   |   5 +-
 .../state/internals/CachingWindowStoreTest.java    |   5 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   3 +-
 .../ChangeLoggingSessionBytesStoreTest.java        |   6 +-
 ...geLoggingTimestampedKeyValueBytesStoreTest.java |   3 +-
 ...angeLoggingTimestampedWindowBytesStoreTest.java |   6 +-
 .../ChangeLoggingWindowBytesStoreTest.java         |   6 +-
 .../CompositeReadOnlyKeyValueStoreTest.java        |   3 +-
 .../internals/GlobalStateStoreProviderTest.java    |   3 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   5 +-
 .../state/internals/MeteredSessionStoreTest.java   |   5 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |   9 +-
 .../MeteredTimestampedWindowStoreTest.java         |  11 +-
 .../state/internals/MeteredWindowStoreTest.java    |  25 ++-
 .../streams/state/internals/RocksDBStoreTest.java  |  39 ++--
 .../internals/RocksDBTimestampedStoreTest.java     |  15 +-
 .../state/internals/RocksDBWindowStoreTest.java    |   7 +-
 .../state/internals/SegmentIteratorTest.java       |   5 +-
 .../internals/TimeOrderedKeyValueBufferTest.java   |  37 +--
 .../kafka/test/InternalMockProcessorContext.java   |  31 ++-
 .../org/apache/kafka/test/MockApiProcessor.java    |  21 +-
 .../kafka/test/MockInternalProcessorContext.java   |  19 ++
 .../java/org/apache/kafka/test/MockProcessor.java  |   8 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |   5 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |  11 +-
 .../apache/kafka/test/NoOpProcessorContext.java    |  15 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  18 +-
 .../streams/processor/MockProcessorContext.java    |   5 +-
 .../kafka/streams/TopologyTestDriverTest.java      | 126 ++++++-----
 80 files changed, 1131 insertions(+), 1172 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 454711c..d6b4f97 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -185,7 +185,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="StreamThreadTest.java"/>
+              files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
 
     <suppress checks="MethodLength"
               files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 90bed05..b48c803 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -20,17 +20,18 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -160,24 +161,24 @@ public class DeveloperGuideTesting {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
-            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
+            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             final KeyValueIterator<String, Long> it = store.all();
             while (it.hasNext()) {
                 final KeyValue<String, Long> next = it.next();
-                context.forward(next.key, next.value);
+                context.forward(new Record<>(next.key, next.value, timestamp));
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f036869..aafe64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -88,9 +88,12 @@ public interface ProcessorContext {
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index df53ee2..4f47b12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -25,7 +27,7 @@ import org.apache.kafka.streams.errors.StreamsException;
  * all data into this store directory.
  * The store directory must be created with the state directory.
  * The state directory can be obtained via {@link ProcessorContext#stateDir() #stateDir()} using the
- * {@link ProcessorContext} provided via {@link #init(ProcessorContext, StateStore) init(...)}.
+ * {@link ProcessorContext} provided via {@link #init(StateStoreContext, StateStore) init(...)}.
  * <p>
  * Using nested store directories within the state directory isolates different state stores.
  * If a state store would write into the state directory directly, it might conflict with others state stores and thus,
@@ -49,7 +51,28 @@ public interface StateStore {
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the context via the
-     * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(StateStoreContext, StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
+
+    /**
+     * Initializes this state store.
+     * <p>
+     * The implementation of this function must register the root store in the context via the
+     * {@link StateStoreContext#register(StateStore, StateRestoreCallback)} function, where the
      * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
      * the second parameter should be an object of user's implementation
      * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
@@ -61,7 +84,9 @@ public interface StateStore {
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
 
     /**
      * Flush any cached data
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
new file mode 100644
index 0000000..43810a2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {
+
+    /**
+     * Returns the application id.
+     *
+     * @return the application id
+     */
+    String applicationId();
+
+    /**
+     * Returns the task id.
+     *
+     * @return the task id
+     */
+    TaskId taskId();
+
+    /**
+     * Returns the default key serde.
+     *
+     * @return the key serializer
+     */
+    Serde<?> keySerde();
+
+    /**
+     * Returns the default value serde.
+     *
+     * @return the value serializer
+     */
+    Serde<?> valueSerde();
+
+    /**
+     * Returns the state directory for the partition.
+     *
+     * @return the state directory
+     */
+    File stateDir();
+
+    /**
+     * Returns Metrics instance.
+     *
+     * @return StreamsMetrics
+     */
+    StreamsMetrics metrics();
+
+    /**
+     * Registers and possibly restores the specified storage engine.
+     *
+     * @param store the storage engine
+     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(final StateStore store,
+                  final StateRestoreCallback stateRestoreCallback);
+
+    /**
+     * Returns all the application config properties as key/value pairs.
+     *
+     * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the StateStoreContext.
+     *
+     * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
+     * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
+     * will be of type {@link Class}, even if it was specified as a String to
+     * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
+     *
+     * @return all the key/values from the StreamsConfig properties
+     */
+    Map<String, Object> appConfigs();
+
+    /**
+     * Returns all the application config properties with the given key prefix, as key/value pairs
+     * stripping the prefix.
+     *
+     * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the StateStoreContext.
+     *
+     * @param prefix the properties prefix
+     * @return the key/values matching the given prefix from the StreamsConfig properties.
+     */
+    Map<String, Object> appConfigsWithPrefix(final String prefix);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
index d3656c7..167976b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -46,12 +46,11 @@ public interface Processor<KIn, VIn, KOut, VOut> {
     default void init(final ProcessorContext<KOut, VOut> context) {}
 
     /**
-     * Process the record with the given key and value.
+     * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
-     * @param key the key for the record
-     * @param value the value for the record
+     * @param record the record to process
      */
-    void process(KIn key, VIn value);
+    void process(Record<KIn, VIn> record);
 
     /**
      * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index 958126a..c591d51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -16,22 +16,19 @@
  */
 package org.apache.kafka.streams.processor.api;
 
-import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Processor context interface.
@@ -56,6 +53,27 @@ public interface ProcessorContext<KForward, VForward> {
     TaskId taskId();
 
     /**
+     * The metadata of the source record, if is one. Processors may be invoked to
+     * process a source record from an input topic, to run a scheduled punctuation
+     * (see {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)} ),
+     * or because a parent processor called {@link ProcessorContext#forward(Record)}.
+     * <p>
+     * In the case of a punctuation, there is no source record, so this metadata would be
+     * undefined. Note that when a punctuator invokes {@link ProcessorContext#forward(Record)},
+     * downstream processors will receive the forwarded record as a regular
+     * {@link Processor#process(Record)} invocation. In other words, it wouldn't be apparent to
+     * downstream processors whether or not the record being processed came from an input topic
+     * or punctuation and therefore whether or not this metadata is defined. This is why
+     * the return type of this method is {@link Optional}.
+     * <p>
+     * If there is any possibility of punctuators upstream, any access
+     * to this field should consider the case of
+     * "<code>recordMetadata().isPresent() == false</code>".
+     * Of course, it would be safest to always guard this condition.
+     */
+    Optional<RecordMetadata> recordMetadata();
+
+    /**
      * Returns the default key serde.
      *
      * @return the key serializer
@@ -84,29 +102,20 @@ public interface ProcessorContext<KForward, VForward> {
     StreamsMetrics metrics();
 
     /**
-     * Registers and possibly restores the specified storage engine.
-     *
-     * @param store the storage engine
-     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
-     *
-     * @throws IllegalStateException If store gets registered after initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
-
-    /**
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
      */
     <S extends StateStore> S getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
      * {@link Processor#init(ProcessorContext) initialization} or
-     * {@link Processor#process(Object, Object) processing} to
+     * {@link Processor#process(Record)}  processing} to
      * schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
      * The type parameter controls what notion of time is used for punctuation:
      * <ul>
@@ -140,23 +149,72 @@ public interface ProcessorContext<KForward, VForward> {
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
-     *
-     * @param key key
-     * @param value value
-     */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
-
-    /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
-     *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
-     */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+     * Forwards a record to all child processors.
+     * <p>
+     * Note that the forwarded {@link Record} is shared between the parent and child
+     * processors. And of course, the parent may forward the same object to multiple children,
+     * and the child may forward it to grandchildren, etc. Therefore, you should be mindful
+     * of mutability.
+     * <p>
+     * The {@link Record} class itself is immutable (all the setter-style methods return an
+     * independent copy of the instance). However, the key, value, and headers referenced by
+     * the Record may themselves be mutable.
+     * <p>
+     * Some programs may opt to make use of this mutability for high performance, in which case
+     * the input record may be mutated and then forwarded by each {@link Processor}. However,
+     * most applications should instead favor safety.
+     * <p>
+     * Forwarding records safely simply means to make a copy of the record before you mutate it.
+     * This is trivial when using the {@link Record#withKey(Object)}, {@link Record#withValue(Object)},
+     * and {@link Record#withTimestamp(long)} methods, as each of these methods make a copy of the
+     * record as a matter of course. But a little extra care must be taken with headers, since
+     * the {@link org.apache.kafka.common.header.Header} class is mutable. The easiest way to
+     * safely handle headers is to use the {@link Record} constructors to make a copy before
+     * modifying headers.
+     * <p>
+     * In other words, this would be considered unsafe:
+     * <code>
+     *     process(Record inputRecord) {
+     *         inputRecord.headers().add(...);
+     *         context.forward(inputRecord);
+     *     }
+     * </code>
+     * This is unsafe because the parent, and potentially siblings, grandparents, etc.,
+     * all will see this modification to their shared Headers reference. This is a violation
+     * of causality and could lead to undefined behavior.
+     * <p>
+     * A safe usage would look like this:
+     * <code>
+     *     process(Record inputRecord) {
+     *         // makes a copy of the headers
+     *         Record toForward = inputRecord.withHeaders(inputRecord.headers());
+     *         // Other options to create a safe copy are:
+     *         // * use any copy-on-write method, which makes a copy of all fields:
+     *         //   toForward = inputRecord.withValue();
+     *         // * explicitly copy all fields:
+     *         //   toForward = new Record(inputRecord.key(), inputRecord.value(), inputRecord.timestamp(), inputRecord.headers());
+     *         // * create a fresh, empty Headers:
+     *         //   toForward = new Record(inputRecord.key(), inputRecord.value(), inputRecord.timestamp());
+     *         // * etc.
+     *
+     *         // now, we are modifying our own independent copy of the headers.
+     *         toForward.headers().add(...);
+     *         context.forward(toForward);
+     *     }
+     * </code>
+     * @param record The record to forward to all children
+     */
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
+
+    /**
+     * Forwards a record to the specified child processor.
+     * See {@link ProcessorContext#forward(Record)} for considerations.
+     *
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
+     * @see ProcessorContext#forward(Record)
+     */
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
      * Requests a commit.
@@ -164,53 +222,6 @@ public interface ProcessorContext<KForward, VForward> {
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the offset
-     */
-    long offset();
-
-    /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the headers
-     */
-    Headers headers();
-
-    /**
-     * Returns the current timestamp.
-     *
-     * <p> If it is triggered while processing a record streamed from the source processor,
-     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
-     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
-     *
-     * <p> If it is triggered while processing a record generated not from the source processor (for example,
-     * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
-     *
-     * @return the timestamp
-     */
-    long timestamp();
-
-    /**
      * Returns all the application config properties as key/value pairs.
      *
      * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
new file mode 100644
index 0000000..652a647
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * Note: this constructor makes a copy of the headers argument.
+     * See {@link ProcessorContext#forward(Record)} for
+     * considerations around mutability of keys, values, and headers.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     * @throws IllegalArgumentException if the timestamp is negative.
+     * @see ProcessorContext#forward(Record)
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);
+    }
+
+    /**
+     * Convenience constructor in case you do not wish to specify any headers.
+     * Subsequent calls to {@link this#headers()} will return a non-null, empty, {@link Headers} collection.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp) {
+        this(key, value, timestamp, null);
+    }
+
+    /**
+     * The key of the record. May be null.
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * The value of the record. May be null.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * The timestamp of the record. Will never be negative.
+     */
+    public long timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * The headers of the record. Never null.
+     */
+    public Headers headers() {
+        return headers;
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the key.
+     *
+     * Copies the attributes of this record with the key replaced.
+     *
+     * @param key The key of the result record. May be null.
+     * @param <NewK> The type of the new record's key.
+     * @return A new Record instance with all the same attributes (except that the key is replaced).
+     */
+    public <NewK> Record<NewK, V> withKey(final NewK key) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the value.
+     *
+     * Copies the attributes of this record with the value replaced.
+     *
+     * @param value The value of the result record.
+     * @param <NewV> The type of the new record's value.
+     * @return A new Record instance with all the same attributes (except that the value is replaced).
+     */
+    public <NewV> Record<K, NewV> withValue(final NewV value) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the timestamp.
+     *
+     * Copies the attributes of this record with the timestamp replaced.
+     *
+     * @param timestamp The timestamp of the result record.
+     * @return A new Record instance with all the same attributes (except that the timestamp is replaced).
+     */
+    public Record<K, V> withTimestamp(final long timestamp) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the headers.
+     *
+     * Copies the attributes of this record with the headers replaced.
+     * Also makes a copy of the provided headers.
+     *
+     * See {@link ProcessorContext#forward(Record)} for
+     * considerations around mutability of keys, values, and headers.
+     *
+     * @param headers The headers of the result record.
+     * @return A new Record instance with all the same attributes (except that the headers are replaced).
+     */
+    public Record<K, V> withHeaders(final Headers headers) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
similarity index 63%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
index 6c5798e..532104a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
@@ -14,28 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.api;
 
-import org.apache.kafka.streams.processor.To;
+public interface RecordMetadata {
+    /**
+     * @return  The topic of the original record received from Kafka
+     */
+    String topic();
 
-public class ToInternal extends To {
-    public ToInternal() {
-        super(To.all());
-    }
+    /**
+     * @return  The partition of the original record received from Kafka
+     */
+    int partition();
 
-    public void update(final To to) {
-        super.update(to);
-    }
-
-    public boolean hasTimestamp() {
-        return timestamp != -1;
-    }
-
-    public long timestamp() {
-        return timestamp;
-    }
-
-    public String child() {
-        return childName;
-    }
-}
\ No newline at end of file
+    /**
+     * @return  The offset of the original record received from Kafka
+     */
+    long offset();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 6012817..c29614a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -30,10 +32,10 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
-    public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
@@ -112,64 +114,69 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         stateManager().registerStore(store, stateRestoreCallback);
     }
 
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
     @Override
     public String topic() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
-        }
-
-        final String topic = recordContext.topic();
-
-        if (NONEXIST_TOPIC.equals(topic)) {
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For topic, the dummy value is `null`.
             return null;
+        } else {
+            return recordContext.topic();
         }
-
-        return topic;
     }
 
-    /**
-     * @throws IllegalStateException if partition is null
-     */
     @Override
     public int partition() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For partition, the dummy value is `-1`.
+            return -1;
+        } else {
+            return recordContext.partition();
         }
-
-        return recordContext.partition();
     }
 
-    /**
-     * @throws IllegalStateException if offset is null
-     */
     @Override
     public long offset() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For offset, the dummy value is `-1L`.
+            return -1L;
+        } else {
+            return recordContext.offset();
         }
-        return recordContext.offset();
     }
 
     @Override
     public Headers headers() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For headers, the dummy value is an empty headers collection.
+            return new RecordHeaders();
+        } else {
+            return recordContext.headers();
         }
-        return recordContext.headers();
     }
 
-    /**
-     * @throws IllegalStateException if timestamp is null
-     */
     @Override
     public long timestamp() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For timestamp, the dummy value is `0L`.
+            return 0L;
+        } else {
+            return recordContext.timestamp();
         }
-        return recordContext.timestamp();
     }
 
     @Override
@@ -196,6 +203,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordContext);
+    }
+
+    @Override
     public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
         this.currentNode = currentNode;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 2b8043a..5e654c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -86,7 +86,7 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         return delegate.getStateStore(name);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 695eb77..3468833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
-
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
@@ -26,11 +24,14 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
@@ -49,26 +50,38 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         return stateManager;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+    public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record);
             }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+        forward(new Record<>(key, value, timestamp(), headers()));
+    }
+
     /**
      * No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2864415..bd3aa3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
@@ -147,7 +148,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+            stateStore.init((StateStoreContext) globalProcessorContext, stateStore);
         }
 
         // make sure each topic-partition from checkpointFileCache is associated with a global state store
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 3664137..6b1378b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.Record;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -104,7 +105,13 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                     deserialized.headers());
             processorContext.setRecordContext(recordContext);
             processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+            final Record<Object, Object> toProcess = new Record<>(
+                deserialized.key(),
+                deserialized.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
deleted file mode 100644
index 39c3084..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {
-
-    @Override
-    StreamsMetricsImpl metrics();
-
-    /**
-     * @param timeMs current wall-clock system timestamp in milliseconds
-     */
-    void setSystemTimeMs(long timeMs);
-
-    /**
-     * @return the current wall-clock system timestamp in milliseconds
-     */
-    long currentSystemTimeMs();
-
-    /**
-     * Returns the current {@link RecordContext}
-     * @return the current {@link RecordContext}
-     */
-    ProcessorRecordContext recordContext();
-
-    /**
-     * @param recordContext the {@link ProcessorRecordContext} for the record about to be processes
-     */
-    void setRecordContext(ProcessorRecordContext recordContext);
-
-    /**
-     * @param currentNode the current {@link ProcessorNode}
-     */
-    void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
-
-    /**
-     * Get the current {@link ProcessorNode}
-     */
-    ProcessorNode<?, ?, ?, ?> currentNode();
-
-    /**
-     * Get the thread-global cache
-     */
-    ThreadCache cache();
-
-    /**
-     * Mark this context as being initialized
-     */
-    void initialize();
-
-    /**
-     * Mark this context as being uninitialized
-     */
-    void uninitialize();
-
-    /**
-     * @return the type of task (active/standby/global) that this context corresponds to
-     */
-    TaskType taskType();
-
-    /**
-     * Transition to active task and register a new task and cache to this processor context
-     */
-    void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache);
-
-    /**
-     * Transition to standby task and register a dummy cache to this processor context
-     */
-    void transitionToStandby(final ThreadCache newCache);
-
-    /**
-     * Register a dirty entry flush listener for a particular namespace
-     */
-    void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener);
-
-    /**
-     * Get a correctly typed state store, given a handle on the original builder.
-     */
-    @SuppressWarnings("unchecked")
-    default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return (T) getStateStore(builder.name());
-    }
-
-    void logChange(final String storeName,
-                   final Bytes key,
-                   final byte[] value,
-                   final long timestamp);
-
-    String changelogFor(final String storeName);
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 8e4ec25..f4b922b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext extends ProcessorContext {
+public interface InternalProcessorContext
+    extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext {
+
     BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
     ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index d8e4af4..f067bbd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 
 public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
     private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
+    private InternalProcessorContext context;
 
     public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
         if (delegate == null) {
@@ -47,12 +49,29 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext<KOut, VOut> context) {
-        delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+        // It only makes sense to use this adapter internally to Streams, in which case
+        // all contexts are implementations of InternalProcessorContext.
+        // This would fail if someone were to use this adapter in a unit test where
+        // the context only implements api.ProcessorContext.
+        this.context = (InternalProcessorContext) context;
+        delegate.init((org.apache.kafka.streams.processor.ProcessorContext) context);
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        delegate.process(key, value);
+    public void process(final Record<KIn, VIn> record) {
+        final ProcessorRecordContext processorRecordContext = context.recordContext();
+        try {
+            context.setRecordContext(new ProcessorRecordContext(
+                record.timestamp(),
+                context.offset(),
+                context.partition(),
+                context.topic(),
+                record.headers()
+            ));
+            delegate.process(record.key(), record.value());
+        } finally {
+            context.setRecordContext(processorRecordContext);
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
deleted file mode 100644
index 85dbd52..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
-    implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {
-
-    private final InternalProcessorContext delegate;
-
-    @SuppressWarnings("unchecked")
-    public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) {
-        if (delegate instanceof ProcessorContextReverseAdapter) {
-            return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
-        } else {
-            return new ProcessorContextAdapter<>(delegate);
-        }
-    }
-
-    private ProcessorContextAdapter(final InternalProcessorContext delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public String applicationId() {
-        return delegate.applicationId();
-    }
-
-    @Override
-    public TaskId taskId() {
-        return delegate.taskId();
-    }
-
-    @Override
-    public Serde<?> keySerde() {
-        return delegate.keySerde();
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return delegate.valueSerde();
-    }
-
-    @Override
-    public File stateDir() {
-        return delegate.stateDir();
-    }
-
-    @Override
-    public StreamsMetricsImpl metrics() {
-        return delegate.metrics();
-    }
-
-    @Override
-    public void setSystemTimeMs(final long timeMs) {
-        delegate.setSystemTimeMs(timeMs);
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return delegate.currentSystemTimeMs();
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return delegate.recordContext();
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        delegate.setRecordContext(recordContext);
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
-        delegate.setCurrentNode(currentNode);
-    }
-
-    @Override
-    public ProcessorNode<?, ?, ?, ?> currentNode() {
-        return delegate.currentNode();
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return delegate.cache();
-    }
-
-    @Override
-    public void initialize() {
-        delegate.initialize();
-    }
-
-    @Override
-    public void uninitialize() {
-        delegate.uninitialize();
-    }
-
-    @Override
-    public Task.TaskType taskType() {
-        return delegate.taskType();
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
-        delegate.transitionToActive(streamTask, recordCollector, newCache);
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-        delegate.transitionToStandby(newCache);
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
-        delegate.registerCacheFlushListener(namespace, listener);
-    }
-
-    @Override
-    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return delegate.getStateStore(builder);
-    }
-
-    @Override
-    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
-        delegate.logChange(storeName, key, value, timestamp);
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return delegate.changelogFor(storeName);
-    }
-
-    @Override
-    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
-        delegate.register(store, stateRestoreCallback);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <S extends StateStore> S getStateStore(final String name) {
-        return (S) delegate.getStateStore(name);
-    }
-
-    @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
-        return delegate.schedule(interval, type, callback);
-    }
-
-    @Override
-    public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
-        delegate.forward(key, value);
-    }
-
-    @Override
-    public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
-        delegate.forward(key, value, to);
-    }
-
-    @Override
-    public void commit() {
-        delegate.commit();
-    }
-
-    @Override
-    public String topic() {
-        return delegate.topic();
-    }
-
-    @Override
-    public int partition() {
-        return delegate.partition();
-    }
-
-    @Override
-    public long offset() {
-        return delegate.offset();
-    }
-
-    @Override
-    public Headers headers() {
-        return delegate.headers();
-    }
-
-    @Override
-    public long timestamp() {
-        return delegate.timestamp();
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        return delegate.appConfigs();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return delegate.appConfigsWithPrefix(prefix);
-    }
-
-    InternalProcessorContext delegate() {
-        return delegate;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e12dfe1..ad1017e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
@@ -30,13 +27,16 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.List;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+import java.util.Map;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
@@ -47,9 +47,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     private StreamTask streamTask;
     private RecordCollector collector;
 
-    private final ToInternal toInternal = new ToInternal();
-    private final static To SEND_TO_ALL = To.all();
-
     private final ProcessorStateManager stateManager;
 
     final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new HashMap<>();
@@ -135,8 +132,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
      * @throws StreamsException if an attempt is made to access this state store from an unknown node
      * @throws UnsupportedOperationException if the current streamTask type is standby
      */
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S  getStateStore(final String name) {
         throwUnsupportedOperationExceptionIfStandby("getStateStore");
         if (currentNode() == null) {
             throw new StreamsException("Accessing from an unknown node");
@@ -144,7 +142,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         final StateStore globalStore = stateManager.getGlobalStore(name);
         if (globalStore != null) {
-            return getReadOnlyStore(globalStore);
+            return (S) getReadOnlyStore(globalStore);
         }
 
         if (!currentNode().stateStores.contains(name)) {
@@ -159,14 +157,19 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
@@ -174,11 +177,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
@@ -186,57 +191,89 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
         throwUnsupportedOperationExceptionIfStandby("forward");
+
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         final ProcessorRecordContext previousContext = recordContext;
 
         try {
-            toInternal.update(to);
-            if (toInternal.hasTimestamp()) {
+            // we don't want to set the recordContext if it's null, since that means that
+            // the context itself is undefined. this isn't perfect, since downstream
+            // old API processors wouldn't see the timestamps or headers of upstream
+            // new API processors. But then again, from the perspective of those old-API
+            // processors, even consulting the timestamp or headers when the record context
+            // is undefined is itself not well defined. Plus, I don't think we need to worry
+            // too much about heterogeneous applications, in which the upstream processor is
+            // implementing the new API and the downstream one is implementing the old API.
+            // So, this seems like a fine compromise for now.
+            if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) {
                 recordContext = new ProcessorRecordContext(
-                    toInternal.timestamp(),
+                    record.timestamp(),
                     recordContext.offset(),
                     recordContext.partition(),
                     recordContext.topic(),
-                    recordContext.headers());
+                    record.headers());
             }
 
-            final String sendTo = toInternal.child();
-            if (sendTo == null) {
+            if (childName == null) {
                 final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
                 for (final ProcessorNode<?, ?, ?, ?> child : children) {
-                    forward((ProcessorNode<K, V, ?, ?>) child, key, value);
+                    forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
                 }
             } else {
-                final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(sendTo);
+                final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(childName);
                 if (child == null) {
-                    throw new StreamsException("Unknown downstream node: " + sendTo
-                        + " either does not exist or is not connected to this processor.");
+                    throw new StreamsException("Unknown downstream node: " + childName
+                                                   + " either does not exist or is not connected to this processor.");
                 }
-                forward((ProcessorNode<K, V, ?, ?>) child, key, value);
+                forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
             }
+
         } finally {
             recordContext = previousContext;
             setCurrentNode(previousNode);
         }
     }
 
-    private <K, V> void forward(final ProcessorNode<K, V, ?, ?> child,
-                                final K key,
-                                final V value) {
+    private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child,
+                                        final Record<K, V> record) {
         setCurrentNode(child);
-        child.process(key, value);
+
+        child.process(record);
+
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());
+            streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
deleted file mode 100644
index 6e82a5e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
-
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
-        if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
-        } else {
-            return new ProcessorContextReverseAdapter(delegate);
-        }
-    }
-
-    private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public String applicationId() {
-        return delegate.applicationId();
-    }
-
-    @Override
-    public TaskId taskId() {
-        return delegate.taskId();
-    }
-
-    @Override
-    public Serde<?> keySerde() {
-        return delegate.keySerde();
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return delegate.valueSerde();
-    }
-
-    @Override
-    public File stateDir() {
-        return delegate.stateDir();
-    }
-
-    @Override
-    public StreamsMetricsImpl metrics() {
-        return delegate.metrics();
-    }
-
-    @Override
-    public void setSystemTimeMs(final long timeMs) {
-        delegate.setSystemTimeMs(timeMs);
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return delegate.currentSystemTimeMs();
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return delegate.recordContext();
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        delegate.setRecordContext(recordContext);
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
-        delegate.setCurrentNode(currentNode);
-    }
-
-    @Override
-    public ProcessorNode<?, ?, ?, ?> currentNode() {
-        return delegate.currentNode();
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return delegate.cache();
-    }
-
-    @Override
-    public void initialize() {
-        delegate.initialize();
-    }
-
-    @Override
-    public void uninitialize() {
-        delegate.uninitialize();
-    }
-
-    @Override
-    public Task.TaskType taskType() {
-        return delegate.taskType();
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
-        delegate.transitionToActive(streamTask, recordCollector, newCache);
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-        delegate.transitionToStandby(newCache);
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
-        delegate.registerCacheFlushListener(namespace, listener);
-    }
-
-    @Override
-    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return delegate.getStateStore(builder);
-    }
-
-    @Override
-    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
-        delegate.logChange(storeName, key, value, timestamp);
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return delegate.changelogFor(storeName);
-    }
-
-    @Override
-    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
-        delegate.register(store, stateRestoreCallback);
-    }
-
-    @Override
-    public StateStore getStateStore(final String name) {
-        return delegate.getStateStore(name);
-    }
-
-    @Deprecated
-    @Override
-    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
-        return delegate.schedule(Duration.ofMillis(intervalMs), type, callback);
-    }
-
-    @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
-        return delegate.schedule(interval, type, callback);
-    }
-
-    @Override
-    public <K, V> void forward(final K key, final V value) {
-        delegate.forward(key, value);
-    }
-
-    @Override
-    public <K, V> void forward(final K key, final V value, final To to) {
-        delegate.forward(key, value, to);
-    }
-
-    @Deprecated
-    @Override
-    public <K, V> void forward(final K key, final V value, final int childIndex) {
-        delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
-    }
-
-    @Deprecated
-    @Override
-    public <K, V> void forward(final K key, final V value, final String childName) {
-        delegate.forward(key, value, To.child(childName));
-    }
-
-    @Override
-    public void commit() {
-        delegate.commit();
-    }
-
-    @Override
-    public String topic() {
-        return delegate.topic();
-    }
-
-    @Override
-    public int partition() {
-        return delegate.partition();
-    }
-
-    @Override
-    public long offset() {
-        return delegate.offset();
-    }
-
-    @Override
-    public Headers headers() {
-        return delegate.headers();
-    }
-
-    @Override
-    public long timestamp() {
-        return delegate.timestamp();
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        return delegate.appConfigs();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return delegate.appConfigsWithPrefix(prefix);
-    }
-
-    InternalApiProcessorContext<Object, Object> delegate() {
-        return delegate;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 122a3bc..38daa52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -20,8 +20,10 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
@@ -104,6 +106,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         childByName.put(child.name, child);
     }
 
+    @SuppressWarnings("unchecked")
     public void init(final InternalProcessorContext context) {
         if (!closed)
             throw new IllegalStateException("The processor is not closed");
@@ -114,7 +117,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init(ProcessorContextAdapter.adapt(context));
+                        processor.init((ProcessorContext<KOut, VOut>) context);
                     }
                 },
                 time,
@@ -171,14 +174,14 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
     }
 
 
-    public void process(final KIn key, final VIn value) {
+    public void process(final Record<KIn, VIn> record) {
         throwIfClosed();
 
         try {
-            maybeMeasureLatency(() -> processor.process(key, value), time, processSensor);
+            maybeMeasureLatency(() -> processor.process(record), time, processSensor);
         } catch (final ClassCastException e) {
-            final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
-            final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+            final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
+            final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
             throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's "
                     + "input types match the deserialized types? Check the Serde setup and change the default Serdes in "
                     + "StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept "
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 5dd0062..2e979ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
@@ -29,7 +30,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
-public class ProcessorRecordContext implements RecordContext {
+public class ProcessorRecordContext implements RecordContext, RecordMetadata {
 
     private final long timestamp;
     private final long offset;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index aa69752..4948ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -197,7 +198,7 @@ public class ProcessorStateManager implements StateManager {
             if (stores.containsKey(store.name())) {
                 maybeRegisterStoreWithChangelogReader(store.name());
             } else {
-                store.init(processorContext, store);
+                store.init((StateStoreContext) processorContext, store);
             }
             log.trace("Registered state store {}", store.name());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index a77deb8..2e4307c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.api.Record;
 
 public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
@@ -76,19 +76,27 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
         }
     }
 
-
     @Override
-    public void process(final KIn key, final VIn value) {
+    public void process(final Record<KIn, VIn> record) {
         final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
-        final long timestamp = context.timestamp();
-        if (timestamp < 0) {
-            throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
-        }
+        final KIn key = record.key();
+        final VIn value = record.value();
+
+        final long timestamp = record.timestamp();
+
+        final ProcessorRecordContext contextForExtraction =
+            new ProcessorRecordContext(
+                timestamp,
+                context.offset(),
+                context.partition(),
+                context.topic(),
+                record.headers()
+            );
 
-        final String topic = topicExtractor.extract(key, value, this.context.recordContext());
+        final String topic = topicExtractor.extract(key, value, contextForExtraction);
 
-        collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
+        collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 6dbdfee..7fa8c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
 public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -92,8 +93,8 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
 
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        context.forward(key, value);
+    public void process(final Record<KIn, VIn> record) {
+        context.forward(record);
         processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
similarity index 60%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
index 2b8043a..ae2e42f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
@@ -19,34 +19,33 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
-import java.util.Objects;
 
-/**
- * {@code ProcessorContext} implementation that will throw on any forward call.
- */
-public final class ForwardingDisabledProcessorContext implements ProcessorContext {
-    private final ProcessorContext delegate;
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+    private final StateStoreContext delegate;
 
-    private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
-        + "as the framework must ensure the key is not changed (#forward allows changing the key on "
-        + "messages which are sent). Try another function, which doesn't allow the key to be changed "
-        + "(for example - #tranformValues).";
+    public static ProcessorContext adapt(final StateStoreContext delegate) {
+        if (delegate instanceof ProcessorContext) {
+            return (ProcessorContext) delegate;
+        } else {
+            return new StoreToProcessorContextAdapter(delegate);
+        }
+    }
 
-    public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
-        this.delegate = Objects.requireNonNull(delegate, "delegate");
+    private StoreToProcessorContextAdapter(final StateStoreContext delegate) {
+        this.delegate = delegate;
     }
 
     @Override
@@ -80,81 +79,76 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
     }
 
     @Override
-    public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback) {
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
         delegate.register(store, stateRestoreCallback);
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
-        return delegate.getStateStore(name);
+    public <S extends StateStore> S getStateStore(final String name) {
+        throw new UnsupportedOperationException("StateStores can't access getStateStore.");
     }
 
-    @Override
     @Deprecated
-    public Cancellable schedule(final long intervalMs,
-                                final PunctuationType type,
-                                final Punctuator callback) {
-        return delegate.schedule(intervalMs, type, callback);
+    @Override
+    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+        throw new UnsupportedOperationException("StateStores can't access schedule.");
     }
 
     @Override
-    public Cancellable schedule(final Duration interval,
-                                final PunctuationType type,
-                                final Punctuator callback) throws IllegalArgumentException {
-        return delegate.schedule(interval, type, callback);
+    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+        throw new UnsupportedOperationException("StateStores can't access schedule.");
     }
 
     @Override
     public <K, V> void forward(final K key, final V value) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
-    @Override
     @Deprecated
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
-    @Override
     @Deprecated
+    @Override
     public <K, V> void forward(final K key, final V value, final String childName) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
     @Override
     public void commit() {
-        delegate.commit();
+        throw new UnsupportedOperationException("StateStores can't access commit.");
     }
 
     @Override
     public String topic() {
-        return delegate.topic();
+        throw new UnsupportedOperationException("StateStores can't access topic.");
     }
 
     @Override
     public int partition() {
-        return delegate.partition();
+        throw new UnsupportedOperationException("StateStores can't access partition.");
     }
 
     @Override
     public long offset() {
-        return delegate.offset();
+        throw new UnsupportedOperationException("StateStores can't access offset.");
     }
 
     @Override
     public Headers headers() {
-        return delegate.headers();
+        throw new UnsupportedOperationException("StateStores can't access headers.");
     }
 
     @Override
     public long timestamp() {
-        return delegate.timestamp();
+        throw new UnsupportedOperationException("StateStores can't access timestamp.");
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e09f33c..9b3726f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@@ -671,9 +672,26 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
             log.trace("Start processing one record [{}]", record);
 
-            updateProcessorContext(record, currNode, wallClockTime);
+            updateProcessorContext(
+                currNode,
+                wallClockTime,
+                new ProcessorRecordContext(
+                    record.timestamp,
+                    record.offset(),
+                    record.partition(),
+                    record.topic(),
+                    record.headers()
+                )
+            );
+
             maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
-            maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
+            final Record<Object, Object> toProcess = new Record<>(
+                record.key(),
+                record.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
 
             log.trace("Completed processing one record [{}]", record);
 
@@ -743,8 +761,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
             throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
         }
 
-        updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
-            timestamp), node, time.milliseconds());
+        updateProcessorContext(node, time.milliseconds(), null);
 
         if (log.isTraceEnabled()) {
             log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
@@ -761,14 +778,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime) {
-        processorContext.setRecordContext(
-            new ProcessorRecordContext(
-                record.timestamp,
-                record.offset(),
-                record.partition(),
-                record.topic(),
-                record.headers()));
+    private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
+                                        final long wallClockTime,
+                                        final ProcessorRecordContext recordContext) {
+        processorContext.setRecordContext(recordContext);
         processorContext.setCurrentNode(currNode);
         processorContext.setSystemTimeMs(wallClockTime);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
index 6c5798e..8865846 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
@@ -23,6 +23,10 @@ public class ToInternal extends To {
         super(To.all());
     }
 
+    public ToInternal(final To to) {
+        super(to);
+    }
+
     public void update(final To to) {
         super.update(to);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 326d277..38baeb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -866,9 +867,9 @@ public class KafkaStreamsTest {
                     }
 
                     @Override
-                    public void process(final String key, final String value) {
-                        if (value.length() % 2 == 0) {
-                            context.forward(key, key + value);
+                    public void process(final Record<String, String> record) {
+                        if (record.value().length() % 2 == 0) {
+                            context.forward(record.withValue(record.key() + record.value()));
                         }
                     }
                 }, "source")
@@ -967,11 +968,11 @@ public class KafkaStreamsTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record) {
                     final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
-                    kvStore.put(key, 5L);
+                    kvStore.put(record.key(), 5L);
 
-                    context.forward(key, "5");
+                    context.forward(record.withValue("5"));
                     context.commit();
                 }
             }, "source")
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 807c50e..415aaea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -106,8 +107,8 @@ public class StreamsBuilderTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
-                    store.put(key, value);
+                public void process(final Record<String, String> record) {
+                    store.put(record.key(), record.value());
                 }
             }
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 57e742b..ef9becf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -371,15 +372,12 @@ public class TopologyTest {
         public Processor<Object, Object, Object, Object> get() {
             return new Processor<Object, Object, Object, Object>() {
                 @Override
-                public void init(final ProcessorContext context) {
+                public void init(final ProcessorContext<Object, Object> context) {
                     context.getStateStore(STORE_NAME);
                 }
 
                 @Override
-                public void process(final Object key, final Object value) { }
-
-                @Override
-                public void close() { }
+                public void process(final Record<Object, Object> record) { }
             };
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8a65bc3..ab2adbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.To;
@@ -131,7 +132,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         }
 
         sessionStore = storeBuilder.build();
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
     }
 
     @After
@@ -640,7 +641,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
                 Serdes.Long())
                 .withLoggingDisabled();
         final SessionStore<String, Long> sessionStore = storeBuilder.build();
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
         return context;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 19eb1d2..08bd5e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -60,7 +61,7 @@ public class KStreamTransformTest {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp)
+                        timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp))
                     );
                 }
 
@@ -126,7 +127,7 @@ public class KStreamTransformTest {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp));
+                        timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp)));
                 }
 
                 @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index dc0b69d..3b46765 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
@@ -243,7 +244,7 @@ public class KTableSuppressProcessorMetricsTest {
         context.setCurrentNode(new ProcessorNode("testNode"));
         context.setSystemTimeMs(time.milliseconds());
 
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         processor.init(context);
 
         final long timestamp = 100L;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 1d1d6fb..778af9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
@@ -85,7 +86,7 @@ public class KTableSuppressProcessorTest {
             final MockInternalProcessorContext context = new MockInternalProcessorContext();
             context.setCurrentNode(new ProcessorNode("testNode"));
 
-            buffer.init(context, buffer);
+            buffer.init((StateStoreContext) context, buffer);
             processor.init(context);
 
             this.processor = processor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index f39d730..4cef063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
@@ -44,6 +45,8 @@ import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 public class AbstractProcessorContextTest {
@@ -81,14 +84,9 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
+    public void shouldReturnNullTopicIfNoRecordContext() {
         context.setRecordContext(null);
-        try {
-            context.topic();
-            fail("should throw illegal state exception when record context is null");
-        } catch (final IllegalStateException e) {
-            // pass
-        }
+        assertThat(context.topic(), is(nullValue()));
     }
 
     @Test
@@ -104,19 +102,14 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfTopicEqualsNonExistTopic() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
+        context.setRecordContext(null);
         assertThat(context.topic(), nullValue());
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() {
+    public void shouldReturnDummyPartitionIfNoRecordContext() {
         context.setRecordContext(null);
-        try {
-            context.partition();
-            fail("should throw illegal state exception when record context is null");
-        } catch (final IllegalStateException e) {
-            // pass
-        }
+        assertThat(context.partition(), is(-1));
     }
 
     @Test
@@ -140,14 +133,9 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() {
+    public void shouldReturnDummyTimestampIfNoRecordContext() {
         context.setRecordContext(null);
-        try {
-            context.timestamp();
-            fail("should throw illegal state exception when record context is null");
-        } catch (final IllegalStateException e) {
-            // pass
-        }
+        assertThat(context.timestamp(), is(0L));
     }
 
     @Test
@@ -161,9 +149,9 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldReturnNullIfHeadersAreNotSet() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
-        assertThat(context.headers(), nullValue());
+    public void shouldReturnEmptyHeadersIfHeadersAreNotSet() {
+        context.setRecordContext(null);
+        assertThat(context.headers(), is(emptyIterable()));
     }
 
     @Test
@@ -211,7 +199,7 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
-        public StateStore getStateStore(final String name) {
+        public <S extends StateStore> S getStateStore(final String name) {
             return null;
         }
 
@@ -231,6 +219,12 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
+        public <K, V> void forward(final Record<K, V> record) {}
+
+        @Override
+        public <K, V> void forward(final Record<K, V> record, final String childName) {}
+
+        @Override
         public <K, V> void forward(final K key, final V value) {}
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index ad8cd0a..a83c92b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -30,6 +32,7 @@ import org.hamcrest.core.IsInstanceOf;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
@@ -97,11 +100,13 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldForwardToSingleChild() {
-        child.process(null, null);
+        child.process(anyObject());
         expectLastCall();
 
+        expect(recordContext.timestamp()).andStubReturn(0L);
+        expect(recordContext.headers()).andStubReturn(new RecordHeaders());
         replay(child, recordContext);
-        globalContext.forward(null, null);
+        globalContext.forward((Object /*forcing a call to the K/V forward*/) null, null);
         verify(child, recordContext);
     }
 
@@ -142,7 +147,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -151,7 +156,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -160,7 +165,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -169,7 +174,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -178,7 +183,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForSessionStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 6ce2bae..3cc06be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil.record;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -127,7 +128,7 @@ public class GlobalStateTaskTest {
     @Test
     public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
+        globalStateTask.update(record(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
         assertEquals(1, sourceOne.numReceived);
         assertEquals(0, sourceTwo.numReceived);
     }
@@ -136,7 +137,7 @@ public class GlobalStateTaskTest {
     public void shouldProcessRecordsForOtherTopic() {
         final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic2, 1, 1, integerBytes, integerBytes));
+        globalStateTask.update(record(topic2, 1, 1, integerBytes, integerBytes));
         assertEquals(1, sourceTwo.numReceived);
         assertEquals(0, sourceOne.numReceived);
     }
@@ -215,7 +216,7 @@ public class GlobalStateTaskTest {
         expectedOffsets.put(t1, 52L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.update(record(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.flushState();
         assertEquals(expectedOffsets, stateMgr.changelogOffsets());
     }
@@ -226,7 +227,7 @@ public class GlobalStateTaskTest {
         expectedOffsets.put(t1, 102L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.update(record(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.flushState();
         assertThat(stateMgr.changelogOffsets(), equalTo(expectedOffsets));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 7d5773b..a69bd12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -49,6 +48,7 @@ import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
+import static org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil.record;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -253,7 +253,7 @@ public class GlobalStreamThreadTest {
             "Thread never started.");
 
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 1L));
-        mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
+        mockConsumer.addRecord(record(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
 
         TestUtils.waitForCondition(
             () -> mockConsumer.position(topicPartition) == 1L,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index f4b62c3d..ab88efa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -767,7 +767,7 @@ public class ProcessorContextImplTest {
         assertTrue(store.persistent());
         assertTrue(store.isOpen());
 
-        checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+        checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
         checkThrowsUnsupportedOperation(store::close, "close()");
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 0d79ae5..68b33f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.Properties;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -36,6 +36,7 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -202,7 +203,10 @@ public class ProcessorNodeTest {
         final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
         final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
-        final StreamsException se = assertThrows(StreamsException.class, () -> node.process("aKey", "aValue"));
+        final StreamsException se = assertThrows(
+            StreamsException.class,
+            () -> node.process(new Record<>("aKey", "aValue", 0))
+        );
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
         assertThat(se.getMessage(), containsString("default Serdes"));
         assertThat(se.getMessage(), containsString("input types"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 448c2b1..439cd59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -295,7 +296,7 @@ public class ProcessorStateManagerTest {
         expect(store.name()).andStubReturn(persistentStoreName);
 
         context.uninitialize();
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         replay(storeMetadata, context, store);
 
         stateMgr.registerStateStores(singletonList(store), context);
@@ -325,7 +326,7 @@ public class ProcessorStateManagerTest {
         expect(store.name()).andStubReturn(persistentStoreName);
 
         context.uninitialize();
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         replay(storeMetadata, context, store);
 
         stateMgr.registerStateStores(singletonList(store), context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 77dc6af..bb63637 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -35,10 +35,10 @@ import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -775,8 +775,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
+        public void process(final Record<String, String> record) {
+            context.forward(record);
         }
     }
 
@@ -792,8 +792,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value, To.all().withTimestamp(context.timestamp() + 10));
+        public void process(final Record<String, String> record) {
+            context.forward(record.withTimestamp(record.timestamp() + 10));
         }
     }
 
@@ -814,11 +814,11 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
-            context.forward(key, value, To.child(firstChild).withTimestamp(context.timestamp() + 5));
-            context.forward(key, value, To.child(secondChild));
-            context.forward(key, value, To.all().withTimestamp(context.timestamp() + 2));
+        public void process(final Record<String, String> record) {
+            context.forward(record);
+            context.forward(record.withTimestamp(record.timestamp() + 5), firstChild);
+            context.forward(record, secondChild);
+            context.forward(record.withTimestamp(record.timestamp() + 2));
         }
     }
 
@@ -831,9 +831,11 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.headers().add(HEADER);
-            context.forward(key, value);
+        public void process(final Record<String, String> record) {
+            // making a copy of headers for safety.
+            final Record<String, String> toForward = record.withHeaders(record.headers());
+            toForward.headers().add(HEADER);
+            context.forward(toForward);
         }
     }
 
@@ -850,8 +852,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value.split("@")[0]);
+        public void process(final Record<String, String> record) {
+            context.forward(record.withValue(record.value().split("@")[0]));
         }
     }
 
@@ -935,8 +937,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            store.put(key, value);
+        public void process(final Record<String, String> record) {
+            store.put(record.key(), record.value());
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 6234b0f..30c7b1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
@@ -50,7 +51,7 @@ public class SinkNodeTest {
         // When/Then
         context.setTime(-1); // ensures a negative timestamp is set for the record we send next
         try {
-            illTypedSink.process("any key".getBytes(), "any value".getBytes());
+            illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1));
             fail("Should have thrown StreamsException");
         } catch (final StreamsException ignored) {
             // expected
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 30e20b0..a17e892 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -133,7 +134,7 @@ public class StreamTaskTest {
     private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
         @Override
-        public void process(final Integer key, final Integer value) {
+        public void process(final Record<Integer, Integer> record) {
             throw new RuntimeException("KABOOM!");
         }
 
@@ -467,10 +468,11 @@ public class StreamTaskTest {
                 this.context = context;
                 super.init(context);
             }
+
             @Override
-            public void process(final Integer key, final Integer value) {
-                if (key % 2 == 0) {
-                    context.forward(key, value);
+            public void process(final Record<Integer, Integer> record) {
+                if (record.key() % 2 == 0) {
+                    context.forward(record);
                 }
             }
         };
@@ -1231,10 +1233,10 @@ public class StreamTaskTest {
         task.completeRestoration();
 
         task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            task.processorContext().recordContext().headers().add("dummy", (byte[]) null);
+            task.processorContext().headers().add("dummy", (byte[]) null);
         });
         task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            assertFalse(task.processorContext().recordContext().headers().iterator().hasNext());
+            assertFalse(task.processorContext().headers().iterator().hasNext());
         });
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 575802e..4ee44ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -1201,9 +1202,8 @@ public class StreamThreadTest {
         internalTopologyBuilder.addProcessor(
             "proc",
             () -> new Processor<Object, Object, Object, Object>() {
-
                 @Override
-                public void process(final Object key, final Object value) {
+                public void process(final Record<Object, Object> record) {
                     if (shouldThrow.get()) {
                         throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition))));
                     } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
new file mode 100644
index 0000000..a702fdc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+public final class ConsumerRecordUtil {
+    private ConsumerRecordUtil() {}
+
+    public static <K, V> ConsumerRecord<K, V> record(final String topic,
+                                                     final int partition,
+                                                     final long offset,
+                                                     final K key,
+                                                     final V value) {
+        // the no-time constructor in ConsumerRecord initializes the
+        // timestamp to -1, which is an invalid configuration. Here,
+        // we initialize it to 0.
+        return new ConsumerRecord<>(
+            topic,
+            partition,
+            offset,
+            0L,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            key,
+            value
+        );
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 4b3f9d5..61d27b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -130,7 +131,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
             new MockRecordCollector(),
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
         );
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
     }
 
     @After
@@ -287,7 +288,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         bytesStore = getBytesStore();
 
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
         assertThat(
             results,
@@ -317,7 +318,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         bytesStore = getBytesStore();
 
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
         assertThat(
             results,
@@ -336,7 +337,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         // need to create a segment so we can attempt to write to it again.
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
         bytesStore.close();
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
     }
 
@@ -365,7 +366,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
     }
 
     private void shouldRestoreToByteStore(final TaskType taskType) {
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         // 0 segments initially.
         assertEquals(0, bytesStore.getSegments().size());
         final String key = "a";
@@ -405,7 +406,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         );
         final Time time = new SystemTime();
         context.setSystemTimeMs(time.milliseconds());
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
             // write a record to advance stream time, with a high enough timestamp
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index ce3aa86..bb425a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -98,7 +99,7 @@ public abstract class AbstractSessionBytesStoreTest {
                 new MockStreamsMetrics(new Metrics())));
         context.setTime(1L);
 
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
     }
 
     @After
@@ -263,7 +264,7 @@ public abstract class AbstractSessionBytesStoreTest {
     @Test
     public void shouldFetchExactKeys() {
         sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long());
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
 
         sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
         sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
@@ -302,7 +303,7 @@ public abstract class AbstractSessionBytesStoreTest {
         final SessionStore<Bytes, String> sessionStore =
             buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String());
 
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
 
         final Bytes key1 = Bytes.wrap(new byte[] {0});
         final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
@@ -427,7 +428,7 @@ public abstract class AbstractSessionBytesStoreTest {
         final Time time = new SystemTime();
         context.setTime(1L);
         context.setSystemTimeMs(time.milliseconds());
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
             // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index a2c6b7a..6aeb28d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -111,7 +112,7 @@ public abstract class AbstractWindowBytesStoreTest {
                 new MockStreamsMetrics(new Metrics())));
         context.setTime(1L);
 
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
     }
 
     @After
@@ -713,7 +714,7 @@ public abstract class AbstractWindowBytesStoreTest {
     @SuppressWarnings("deprecation")
     public void testPutSameKeyTimestamp() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         final long startTime = SEGMENT_INTERVAL - 4L;
 
@@ -797,7 +798,7 @@ public abstract class AbstractWindowBytesStoreTest {
             Serdes.String(),
             Serdes.String());
 
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         windowStore.put("a", "0001", 0);
         windowStore.put("aa", "0002", 0);
@@ -882,7 +883,7 @@ public abstract class AbstractWindowBytesStoreTest {
             true,
             Serdes.Bytes(),
             Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         final Bytes key1 = Bytes.wrap(new byte[] {0});
         final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
@@ -970,7 +971,7 @@ public abstract class AbstractWindowBytesStoreTest {
         final Time time = new SystemTime();
         context.setSystemTimeMs(time.milliseconds());
         context.setTime(1L);
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
             // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
@@ -1111,7 +1112,7 @@ public abstract class AbstractWindowBytesStoreTest {
     @SuppressWarnings("deprecation")
     public void testFetchDuplicates() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         long currentTime = 0;
         setCurrentTime(currentTime);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index d0e10d5..98f0ba6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -77,7 +78,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(null, null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        store.init(context, null);
+        store.init((StateStoreContext) context, null);
     }
 
     @After
@@ -178,7 +179,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         cache = EasyMock.niceMock(ThreadCache.class);
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index d8e97b8..f36629d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -90,7 +91,7 @@ public class CachingSessionStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     @After
@@ -186,7 +187,7 @@ public class CachingSessionStoreTest {
         cache = EasyMock.niceMock(ThreadCache.class);
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 6c4ddf6..42b750b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +105,7 @@ public class CachingWindowStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     @After
@@ -886,7 +887,7 @@ public class CachingWindowStoreTest {
         cache = EasyMock.createNiceMock(ThreadCache.class);
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String key, final String value, final long timestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index d2d1d73..9106580 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
@@ -55,7 +56,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 5ab035c..426a334 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.SessionStore;
@@ -55,11 +57,11 @@ public class ChangeLoggingSessionBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index f630abb..e05b171 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -60,7 +61,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 4a240b1..9de2207 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -57,11 +59,11 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 0728a1e..c877ac6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -55,11 +57,11 @@ public class ChangeLoggingWindowBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 27dcff4..736721a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -79,7 +80,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
             Serdes.String(), Serdes.String()), new MockRecordCollector());
         context.setTime(1L);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         return store;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 62059f1..fd427e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -100,7 +101,7 @@ public class GlobalStateStoreProviderTest {
         expect(mockContext.recordCollector()).andStubReturn(null);
         replay(mockContext);
         for (final StateStore store : stores.values()) {
-            store.init(mockContext, null);
+            store.init((StateStoreContext) mockContext, null);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f639255..83fffef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -154,7 +155,7 @@ public class MeteredKeyValueStoreTest {
 
     private void init() {
         replay(inner, context);
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
     }
 
     @Test
@@ -190,7 +191,7 @@ public class MeteredKeyValueStoreTest {
             keySerde,
             valueSerde
         );
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
 
         metered.get(KEY);
         metered.put(KEY, VALUE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 28efbf9..0ff822e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -158,7 +159,7 @@ public class MeteredSessionStoreTest {
 
     private void init() {
         replay(innerStore, context);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
@@ -195,7 +196,7 @@ public class MeteredSessionStoreTest {
             valueSerde,
             new MockTime()
         );
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP);
         store.put(WINDOWED_KEY, VALUE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index c03ffef..3d28266 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -158,7 +159,7 @@ public class MeteredTimestampedKeyValueStoreTest {
 
     private void init() {
         replay(inner, context);
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
     }
 
     @Test
@@ -194,7 +195,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             keySerde,
             valueSerde
         );
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
 
         metered.get(KEY);
         metered.put(KEY, VALUE_AND_TIMESTAMP);
@@ -430,7 +431,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             null
         );
         replay(inner, context);
-        store.init(context, inner);
+        store.init((StateStoreContext) context, inner);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
@@ -455,7 +456,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             new ValueAndTimestampSerde<>(Serdes.Long())
         );
         replay(inner, context);
-        store.init(context, inner);
+        store.init((StateStoreContext) context, inner);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 18be067..9a9d763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -129,7 +130,7 @@ public class MeteredTimestampedWindowStoreTest {
             keySerde,
             valueSerde
         );
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.fetch(KEY, TIMESTAMP);
         store.put(KEY, VALUE_AND_TIMESTAMP, TIMESTAMP);
@@ -143,7 +144,7 @@ public class MeteredTimestampedWindowStoreTest {
         EasyMock.expectLastCall();
         EasyMock.replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.close();
         EasyMock.verify(innerStoreMock);
     }
@@ -153,7 +154,7 @@ public class MeteredTimestampedWindowStoreTest {
         EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
         EasyMock.replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         assertNull(store.fetch("a", 0));
     }
 
@@ -170,7 +171,7 @@ public class MeteredTimestampedWindowStoreTest {
             null,
             null
         );
-        store.init(context, innerStoreMock);
+        store.init((StateStoreContext) context, innerStoreMock);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
@@ -195,7 +196,7 @@ public class MeteredTimestampedWindowStoreTest {
             Serdes.String(),
             new ValueAndTimestampSerde<>(Serdes.Long())
         );
-        store.init(context, innerStoreMock);
+        store.init((StateStoreContext) context, innerStoreMock);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 8671521..7301694 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -184,7 +185,7 @@ public class MeteredWindowStoreTest {
             keySerde,
             valueSerde
         );
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.fetch(KEY, TIMESTAMP);
         store.put(KEY, VALUE, TIMESTAMP);
@@ -195,7 +196,7 @@ public class MeteredWindowStoreTest {
     @Test
     public void testMetrics() {
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         final JmxReporter reporter = new JmxReporter();
         final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
         reporter.contextChange(metricsContext);
@@ -225,10 +226,10 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordRestoreLatencyOnInit() {
-        innerStoreMock.init(context, store);
+        innerStoreMock.init((StateStoreContext) context, store);
         expectLastCall();
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
             assertEquals(1.0, getMetricByNameFilterByTags(
@@ -254,7 +255,7 @@ public class MeteredWindowStoreTest {
         expectLastCall();
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.put("a", "a");
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -279,7 +280,7 @@ public class MeteredWindowStoreTest {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -304,7 +305,7 @@ public class MeteredWindowStoreTest {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -330,7 +331,7 @@ public class MeteredWindowStoreTest {
         expectLastCall();
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.flush();
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -355,7 +356,7 @@ public class MeteredWindowStoreTest {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         assertNull(store.fetch("a", 0));
     }
 
@@ -393,7 +394,7 @@ public class MeteredWindowStoreTest {
         innerStoreMock.close();
         expectLastCall();
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.close();
         verify(innerStoreMock);
@@ -404,7 +405,7 @@ public class MeteredWindowStoreTest {
         innerStoreMock.close();
         expectLastCall();
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         assertThat(storeMetrics(), not(empty()));
         store.close();
@@ -417,7 +418,7 @@ public class MeteredWindowStoreTest {
         innerStoreMock.close();
         expectLastCall().andThrow(new RuntimeException("Oops!"));
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         // There's always a "count" metric registered
         assertThat(storeMetrics(), not(empty()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index ca28181..dead979 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -274,7 +275,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8));
         rocksDBStore.flush();
 
@@ -297,7 +298,7 @@ public class RocksDBStoreTest {
     public void shouldCallRocksDbConfigSetter() {
         MockRocksDbConfigSetter.called = false;
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
@@ -325,7 +326,7 @@ public class RocksDBStoreTest {
             new Bytes(stringSerializer.serialize(null, "3")),
             stringSerializer.serialize(null, "c")));
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         rocksDBStore.putAll(entries);
         rocksDBStore.flush();
 
@@ -350,7 +351,7 @@ public class RocksDBStoreTest {
     public void shouldRestoreAll() {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
         assertEquals(
@@ -372,7 +373,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldPutOnlyIfAbsentValue() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
         final byte[] valueBytes = stringSerializer.serialize(null, "A");
         final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B");
@@ -389,7 +390,7 @@ public class RocksDBStoreTest {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
         entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
         final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
@@ -413,7 +414,7 @@ public class RocksDBStoreTest {
         // this will restore key "1" as WriteBatch applies updates in order
         entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8)));
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
         final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
@@ -446,7 +447,7 @@ public class RocksDBStoreTest {
     public void shouldRestoreThenDeleteOnRestoreAll() {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         context.restore(rocksDBStore.name(), entries);
 
@@ -486,7 +487,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPut() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
@@ -494,7 +495,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPutAll() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
@@ -502,7 +503,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullGet() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.get(null));
@@ -510,7 +511,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnDelete() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.delete(null));
@@ -518,7 +519,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnRange() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))));
@@ -526,7 +527,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         Utils.delete(dir);
         rocksDBStore.put(
             new Bytes(stringSerializer.serialize(null, "anyKey")),
@@ -545,7 +546,7 @@ public class RocksDBStoreTest {
             new StreamsConfig(props));
 
         enableBloomFilters = false;
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         final List<String> expectedValues = new ArrayList<>();
         expectedValues.add("a");
@@ -570,7 +571,7 @@ public class RocksDBStoreTest {
         // reopen with Bloom Filters enabled
         // should open fine without errors
         enableBloomFilters = true;
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
             final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key));
@@ -596,7 +597,7 @@ public class RocksDBStoreTest {
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         final byte[] key = "hello".getBytes();
         final byte[] value = "world".getBytes();
         rocksDBStore.put(Bytes.wrap(key), value);
@@ -628,7 +629,7 @@ public class RocksDBStoreTest {
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         final byte[] key = "hello".getBytes();
         final byte[] value = "world".getBytes();
         rocksDBStore.put(Bytes.wrap(key), value);
@@ -659,7 +660,7 @@ public class RocksDBStoreTest {
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         final List<String> propertyNames = Arrays.asList(
             "num-entries-active-mem-table",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 042039c..75b4e80 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.hamcrest.core.IsNull;
@@ -49,7 +50,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
     @Test
     public void shouldOpenNewStoreInRegularMode() {
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         }
@@ -62,13 +63,13 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
     @Test
     public void shouldOpenExistingStoreInRegularMode() throws Exception {
         // prepare store
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         rocksDBStore.put(new Bytes("key".getBytes()), "timestamped".getBytes());
         rocksDBStore.close();
 
         // re-open store
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         } finally {
@@ -121,7 +122,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
         prepareOldStore();
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
         }
@@ -404,7 +405,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
         // check that still in upgrade mode
         try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
         } finally {
@@ -438,7 +439,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
         // check that still in regular mode
         try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         }
@@ -447,7 +448,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
     private void prepareOldStore() {
         final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
         try {
-            keyValueStore.init(context, keyValueStore);
+            keyValueStore.init((StateStoreContext) context, keyValueStore);
 
             keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
             keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c7021fd..9e9b0b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -389,7 +390,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
 
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
             Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         context.setTime(0L);
         setCurrentTime(0);
@@ -479,7 +480,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
         windowStore.close();
 
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         // put something in the store to advance its stream time and expire the old segments
         windowStore.put(1, "v", 6L * SEGMENT_INTERVAL);
@@ -546,7 +547,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                                        false,
                                        Serdes.Integer(),
                                        Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         assertEquals(
             new HashSet<>(Collections.emptyList()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 7df1404..97593e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -63,8 +64,8 @@ public class SegmentIteratorTest {
                 new LogContext("testCache "),
                 0,
                 new MockStreamsMetrics(new Metrics())));
-        segmentOne.init(context, segmentOne);
-        segmentTwo.init(context, segmentTwo);
+        segmentOne.init((StateStoreContext) context, segmentOne);
+        segmentTwo.init((StateStoreContext) context, segmentTwo);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
         segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
         segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index a054ac9..0037fbb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -123,7 +124,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldInit() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         cleanup(context, buffer);
     }
 
@@ -131,7 +132,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldAcceptData() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
         cleanup(context, buffer);
     }
@@ -140,7 +141,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRejectNullValues() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         try {
             buffer.put(0, "asdf", null, getContext(0));
             fail("expected an exception");
@@ -154,7 +155,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRemoveData() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
         assertThat(buffer.numRecords(), is(1));
         buffer.evictWhile(() -> true, kv -> { });
@@ -166,7 +167,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRespectEvictionPredicate() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
         putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
         assertThat(buffer.numRecords(), is(2));
@@ -183,7 +184,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldTrackCount() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "oin");
         assertThat(buffer.numRecords(), is(1));
         putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
@@ -197,7 +198,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldTrackSize() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
         assertThat(buffer.bufferSize(), is(43L));
         putRecord(buffer, context, 1L, 0L, "asdf", "3l");
@@ -211,7 +212,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldTrackMinTimestamp() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
         assertThat(buffer.minTimestamp(), is(1L));
         putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
@@ -223,7 +224,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
@@ -269,7 +270,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         assertThat(buffer.priorValueForBuffered("ASDF"), is(Maybe.undefined()));
     }
@@ -278,7 +279,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldReturnPriorValueForBufferedKey() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final ProcessorRecordContext recordContext = getContext(0L);
         context.setRecordContext(recordContext);
@@ -292,7 +293,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldFlush() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
         putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
         putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
@@ -363,7 +364,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreOldUnversionedFormat() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -477,7 +478,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV1Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -598,7 +599,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV2Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -721,7 +722,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         // Note the data is the same as the V3 test.
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -841,7 +842,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV3Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -961,7 +962,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldNotRestoreUnrecognizedVersionRecord() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 51b9231..6086f97 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -261,9 +262,10 @@ public class InternalMockProcessorContext
         stateManager().registerStore(store, func);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
-        return storeMap.get(name);
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) storeMap.get(name);
     }
 
     @Override
@@ -283,6 +285,28 @@ public class InternalMockProcessorContext
     public void commit() {}
 
     @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
+            setTime(record.timestamp());
+        }
+        final ProcessorNode<?, ?, ?, ?> thisNode = currentNode;
+        try {
+            for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
+                currentNode = childNode;
+                ((ProcessorNode<K, V, ?, ?>) childNode).process(record);
+            }
+        } finally {
+            currentNode = thisNode;
+        }
+    }
+
+    @Override
     public void forward(final Object key, final Object value) {
         forward(key, value, To.all());
     }
@@ -311,7 +335,8 @@ public class InternalMockProcessorContext
             for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
                 if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
                     currentNode = childNode;
-                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(key, value);
+                    final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers());
+                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record);
                     toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
                                            // Processors and toInternal might have been modified
                 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index 262aecc..dd56bad 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.time.Duration;
@@ -65,25 +66,19 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
             scheduleCancellable = context.schedule(
                 Duration.ofMillis(scheduleInterval),
                 punctuationType,
-                timestamp -> {
-                    if (punctuationType == PunctuationType.STREAM_TIME) {
-                        assertThat(context.timestamp(), is(timestamp));
-                    }
-                    assertThat(context.partition(), is(-1));
-                    assertThat(context.offset(), is(-1L));
-
-                    (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
-                        .add(timestamp);
-                });
+                (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)::add
+            );
         }
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, context.timestamp());
+    public void process(final Record<KIn, VIn> record) {
+        final KIn key = record.key();
+        final VIn value = record.value();
+        final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
 
         if (value != null) {
-            lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context.timestamp()));
+            lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp()));
         } else {
             lastValueAndTimestampPerKey.remove(key);
         }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 58b90c1..370dca7 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -33,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
@@ -67,11 +71,26 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     }
 
     @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record.key(), record.value(), To.all().withTimestamp(record.timestamp()));
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        forward(record.key(), record.value(), To.child(childName).withTimestamp(record.timestamp()));
+    }
+
+    @Override
     public ProcessorRecordContext recordContext() {
         return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
     }
 
     @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.of(recordContext());
+    }
+
+    @Override
     public void setRecordContext(final ProcessorRecordContext recordContext) {
         setRecordMetadata(
             recordContext.topic(),
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 17391bc..f18b763 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -21,8 +21,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextAdapter;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.ArrayList;
@@ -41,15 +40,16 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
         delegate = new MockApiProcessor<>();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context) {
         super.init(context);
-        delegate.init(ProcessorContextAdapter.adapt((InternalProcessorContext) context));
+        delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context);
     }
 
     @Override
     public void process(final K key, final V value) {
-        delegate.process(key, value);
+        delegate.process(new Record<>(key, value, context.timestamp(), context.headers()));
     }
 
     public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 9a8b407..a75c250 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
@@ -58,8 +59,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        processor().process(key, value);
+    public void process(final Record<KIn, VIn> record) {
+        processor().process(record);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 5130d46..9d22e3b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
@@ -39,10 +40,10 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        this.numReceived++;
-        this.keys.add(key);
-        this.values.add(value);
+    public void process(final Record<KIn, VIn> record) {
+        numReceived++;
+        keys.add(record.key());
+        values.add(record.value());
     }
 
     @Override
@@ -54,6 +55,6 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     @Override
     public void close() {
         super.close();
-        this.closed = true;
+        closed = true;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index b2b62ee..374f8ec 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 
@@ -65,7 +66,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         return null;
     }
 
@@ -85,13 +86,23 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record.key(), record.value());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        forward(record.key(), record.value());
+    }
+
+    @Override
     public <K, V> void forward(final K key, final V value) {
         forwardedValues.put(key, value);
     }
 
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        forwardedValues.put(key, value);
+        forward(key, value);
     }
 
     @Override
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 54b09de..1b17f67 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -59,7 +58,6 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -444,13 +442,7 @@ public class TopologyTestDriver implements Closeable {
                 new LogAndContinueExceptionHandler()
             );
             globalStateTask.initialize();
-            globalProcessorContext.setRecordContext(new ProcessorRecordContext(
-                0L,
-                -1L,
-                -1,
-                ProcessorContextImpl.NONEXIST_TOPIC,
-                new RecordHeaders())
-            );
+            globalProcessorContext.setRecordContext(null);
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -510,13 +502,7 @@ public class TopologyTestDriver implements Closeable {
             );
             task.initializeIfNeeded();
             task.completeRestoration();
-            task.processorContext().setRecordContext(new ProcessorRecordContext(
-                0L,
-                -1L,
-                -1,
-                ProcessorContextImpl.NONEXIST_TOPIC,
-                new RecordHeaders())
-            );
+            task.processorContext().setRecordContext(null);
         } else {
             task = null;
         }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 8e546e0..88e1660 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -401,9 +401,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         stateStores.put(store.name(), store);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
-        return stateStores.get(name);
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
     }
 
     @Override
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7ae43ef..8cf38c9 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -41,10 +41,11 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -148,7 +149,7 @@ public class TopologyTestDriverTest {
         }
     }
 
-    private final static class Record {
+    private final static class TTDTestRecord {
         private final Object key;
         private final Object value;
         private final long timestamp;
@@ -156,8 +157,8 @@ public class TopologyTestDriverTest {
         private final String topic;
         private final Headers headers;
 
-        Record(final ConsumerRecord<byte[], byte[]> consumerRecord,
-               final long newOffset) {
+        TTDTestRecord(final ConsumerRecord<byte[], byte[]> consumerRecord,
+                      final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
             timestamp = consumerRecord.timestamp();
@@ -166,9 +167,9 @@ public class TopologyTestDriverTest {
             headers = consumerRecord.headers();
         }
 
-        Record(final String newTopic,
-               final TestRecord<byte[], byte[]> consumerRecord,
-               final long newOffset) {
+        TTDTestRecord(final String newTopic,
+                      final TestRecord<byte[], byte[]> consumerRecord,
+                      final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
             timestamp = consumerRecord.timestamp();
@@ -177,12 +178,12 @@ public class TopologyTestDriverTest {
             headers = consumerRecord.headers();
         }
 
-        Record(final Object key,
-               final Object value,
-               final Headers headers,
-               final long timestamp,
-               final long offset,
-               final String topic) {
+        TTDTestRecord(final Object key,
+                      final Object value,
+                      final Headers headers,
+                      final long timestamp,
+                      final long offset,
+                      final String topic) {
             this.key = key;
             this.value = value;
             this.headers = headers;
@@ -204,7 +205,7 @@ public class TopologyTestDriverTest {
             if (o == null || getClass() != o.getClass()) {
                 return false;
             }
-            final Record record = (Record) o;
+            final TTDTestRecord record = (TTDTestRecord) o;
             return timestamp == record.timestamp &&
                 offset == record.offset &&
                 Objects.equals(key, record.key) &&
@@ -248,7 +249,7 @@ public class TopologyTestDriverTest {
 
         private boolean initialized = false;
         private boolean closed = false;
-        private final List<Record> processedRecords = new ArrayList<>();
+        private final List<TTDTestRecord> processedRecords = new ArrayList<>();
 
         MockProcessor(final Collection<Punctuation> punctuations) {
             this.punctuations = punctuations;
@@ -264,9 +265,16 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(final Object key, final Object value) {
-            processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
-            context.forward(key, value);
+        public void process(final Record<Object, Object> record) {
+            processedRecords.add(new TTDTestRecord(
+                record.key(),
+                record.value(),
+                record.headers(),
+                record.timestamp(),
+                context.recordMetadata().map(RecordMetadata::offset).orElse(-1L),
+                context.recordMetadata().map(RecordMetadata::topic).orElse(null)
+            ));
+            context.forward(record);
         }
 
         @Override
@@ -399,8 +407,8 @@ public class TopologyTestDriverTest {
                     }
 
                     @Override
-                    public void process(final Object key, final Object value) {
-                        store.put(key, value);
+                    public void process(final Record<Object, Object> record) {
+                        store.put(record.key(), record.value());
                     }
                 }
             );
@@ -578,11 +586,11 @@ public class TopologyTestDriverTest {
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
-        final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords = mockProcessors.get(0).processedRecords;
         assertEquals(1, processedRecords.size());
 
-        final Record record = processedRecords.get(0);
-        final Record expectedResult = new Record(SOURCE_TOPIC_1, testRecord1, 0L);
+        final TTDTestRecord record = processedRecords.get(0);
+        final TTDTestRecord expectedResult = new TTDTestRecord(SOURCE_TOPIC_1, testRecord1, 0L);
 
         assertThat(record, equalTo(expectedResult));
     }
@@ -598,16 +606,16 @@ public class TopologyTestDriverTest {
     public void shouldSendRecordViaCorrectSourceTopicDeprecated() {
         testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
 
-        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
-        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+        final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
 
         testDriver.pipeInput(consumerRecord1);
 
         assertEquals(1, processedRecords1.size());
         assertEquals(0, processedRecords2.size());
 
-        Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1, 0L);
+        TTDTestRecord record = processedRecords1.get(0);
+        TTDTestRecord expectedResult = new TTDTestRecord(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         testDriver.pipeInput(consumerRecord2);
@@ -616,7 +624,7 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords2.size());
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2, 0L);
+        expectedResult = new TTDTestRecord(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -841,8 +849,8 @@ public class TopologyTestDriverTest {
     public void shouldProcessConsumerRecordList() {
         testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
 
-        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
-        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+        final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
 
         final List<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<>(2);
         testRecords.add(consumerRecord1);
@@ -853,12 +861,12 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords1.size());
         assertEquals(1, processedRecords2.size());
 
-        Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1, 0L);
+        TTDTestRecord record = processedRecords1.get(0);
+        TTDTestRecord expectedResult = new TTDTestRecord(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2, 0L);
+        expectedResult = new TTDTestRecord(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -1446,24 +1454,24 @@ public class TopologyTestDriverTest {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
-            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, timestamp -> flushStore());
+            context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             try (final KeyValueIterator<String, Long> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<String, Long> next = it.next();
-                    context.forward(next.key, next.value);
+                    context.forward(new Record<>(next.key, next.value, timestamp));
                 }
             }
         }
@@ -1505,8 +1513,8 @@ public class TopologyTestDriverTest {
                         }
 
                         @Override
-                        public void process(final String key, final Long value) {
-                            store.put(key, value);
+                        public void process(final Record<String, Long> record) {
+                            store.put(record.key(), record.value());
                         }
                     };
                 }
@@ -1589,16 +1597,16 @@ public class TopologyTestDriverTest {
 
         testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config);
 
-        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
-        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+        final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
         assertEquals(1, processedRecords1.size());
         assertEquals(0, processedRecords2.size());
 
-        final Record record1 = processedRecords1.get(0);
-        final Record expectedResult1 = new Record(SOURCE_TOPIC_1, testRecord1, 0L);
+        final TTDTestRecord record1 = processedRecords1.get(0);
+        final TTDTestRecord expectedResult1 = new TTDTestRecord(SOURCE_TOPIC_1, testRecord1, 0L);
         assertThat(record1, equalTo(expectedResult1));
 
         pipeRecord(consumerTopic2, consumerRecord2);
@@ -1606,8 +1614,8 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords1.size());
         assertEquals(1, processedRecords2.size());
 
-        final Record record2 = processedRecords2.get(0);
-        final Record expectedResult2 = new Record(consumerTopic2, consumerRecord2, 0L);
+        final TTDTestRecord record2 = processedRecords2.get(0);
+        final TTDTestRecord expectedResult2 = new TTDTestRecord(consumerTopic2, consumerRecord2, 0L);
         assertThat(record2, equalTo(expectedResult2));
     }
 
@@ -1694,11 +1702,12 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record) {
+                    final String value = record.value();
                     if (!value.startsWith("recurse-")) {
-                        context.forward(key, "recurse-" + value, To.child("recursiveSink"));
+                        context.forward(record.withValue("recurse-" + value), "recursiveSink");
                     }
-                    context.forward(key, value, To.child("sink"));
+                    context.forward(record, "sink");
                 }
             },
             "source"
@@ -1751,8 +1760,8 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
-                    stateStore.put(key, value);
+                public void process(final Record<String, String> record) {
+                    stateStore.put(record.key(), record.value());
                 }
             }
         );
@@ -1767,12 +1776,13 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record) {
+                    final String value = record.value();
                     if (!value.startsWith("recurse-")) {
-                        context.forward(key, "recurse-" + value, To.child("recursiveSink"));
+                        context.forward(record.withValue("recurse-" + value), "recursiveSink");
                     }
-                    context.forward(key, value, To.child("sink"));
-                    context.forward(key, value, To.child("globalSink"));
+                    context.forward(record, "sink");
+                    context.forward(record, "globalSink");
                 }
             },
             "source"