You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/01 17:06:17 UTC

[kafka] branch kip-478-part-4-record-processor created (now d3321aa)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch kip-478-part-4-record-processor
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at d3321aa  move metadata to context

This branch includes the following new commits:

     new 22b9c3a  KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext
     new d3321aa  move metadata to context

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 02/02: move metadata to context

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-4-record-processor
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d3321aa0d199c80b04ab9e1121b4501b333db7a5
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 1 12:06:18 2020 -0500

    move metadata to context
---
 .../examples/docs/DeveloperGuideTesting.java       | 10 ++++-----
 .../kafka/streams/processor/api/Processor.java     |  9 +-------
 .../streams/processor/api/ProcessorContext.java    | 10 +++++++++
 .../internals/AbstractProcessorContext.java        |  7 ++++++
 .../internals/GlobalProcessorContextImpl.java      |  5 +----
 .../processor/internals/GlobalStateUpdateTask.java |  6 +----
 .../processor/internals/ProcessorAdapter.java      |  5 +----
 .../processor/internals/ProcessorContextImpl.java  |  6 +----
 .../streams/processor/internals/ProcessorNode.java |  8 +++----
 .../streams/processor/internals/SinkNode.java      | 26 ++++++----------------
 .../streams/processor/internals/SourceNode.java    |  5 +----
 .../streams/processor/internals/StreamTask.java    |  5 +----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  6 ++---
 .../apache/kafka/streams/StreamsBuilderTest.java   |  4 +---
 .../org/apache/kafka/streams/TopologyTest.java     |  4 +---
 .../internals/GlobalProcessorContextImplTest.java  |  2 +-
 .../processor/internals/ProcessorNodeTest.java     |  6 ++---
 .../processor/internals/ProcessorTopologyTest.java | 14 +++++-------
 .../streams/processor/internals/SinkNodeTest.java  |  2 +-
 .../processor/internals/StreamTaskTest.java        |  6 ++---
 .../processor/internals/StreamThreadTest.java      |  4 +---
 .../kafka/test/InternalMockProcessorContext.java   |  5 ++---
 .../org/apache/kafka/test/MockApiProcessor.java    |  4 +---
 .../kafka/test/MockInternalProcessorContext.java   |  7 ++++++
 .../java/org/apache/kafka/test/MockProcessor.java  |  7 +-----
 .../org/apache/kafka/test/MockProcessorNode.java   |  6 ++---
 .../java/org/apache/kafka/test/MockSourceNode.java |  4 +---
 .../kafka/streams/TopologyTestDriverTest.java      | 19 ++++++++--------
 28 files changed, 78 insertions(+), 124 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 1ce3445..72e704e 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -21,26 +21,24 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -170,7 +168,7 @@ public class DeveloperGuideTesting {
         }
 
         @Override
-        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, Long> record) {
             final Long oldValue = store.get(record.key());
             if (oldValue == null || record.value() > oldValue) {
                 store.put(record.key(), record.value());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
index e6feccb..167976b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.time.Duration;
-import java.util.Optional;
 
 /**
  * A processor of key-value pair records.
@@ -50,14 +49,8 @@ public interface Processor<KIn, VIn, KOut, VOut> {
      * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
      * @param record the record to process
-     * @param recordMetadata the metadata of the record, if it is defined. Note that as long as the processor is
-     *                       receiving a record downstream of a Source (i.e., the current record is coming from an
-     *                       input topic), the metadata is defined. On the other hand, if a parent processor has
-     *                       registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
-     *                       punctuator, then there is no record from an input topic, and therefore the metadata
-     *                       would be undefined.
      */
-    void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);
+    void process(Record<KIn, VIn> record);
 
     /**
      * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index f4f0fdb..a431993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -53,6 +53,16 @@ public interface ProcessorContext<KForward, VForward> {
     TaskId taskId();
 
     /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is
+     * receiving a record downstream of a Source (i.e., the current record is coming from an
+     * input topic), the metadata is defined. On the other hand, if a parent processor has
+     * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+     * punctuator, then there is no record from an input topic, and therefore the metadata
+     * would be undefined.
+     */
+    Optional<RecordMetadata> recordMetadata();
+
+    /**
      * Returns the default key serde.
      *
      * @return the key serializer
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index ef222e0..c29614a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -31,6 +32,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
@@ -201,6 +203,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordContext);
+    }
+
+    @Override
     public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
         this.currentNode = currentNode;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 9f31309..3468833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,13 +25,11 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
-import java.util.Optional;
 
 import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
@@ -63,11 +61,10 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     @Override
     public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
-        final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<K, V, ?, ?>) child).process(record, recordMetadata);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record);
             }
         } finally {
             setCurrentNode(previousNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 360e50e..6b1378b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
@@ -112,10 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(
-                toProcess,
-                Optional.of(recordContext)
-            );
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index 291a99e..84c8602 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -20,9 +20,6 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-
-import java.util.Optional;
 
 public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
     private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
@@ -57,7 +54,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         final ProcessorRecordContext processorRecordContext = context.recordContext();
         try {
             context.setRecordContext(new ProcessorRecordContext(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 309b813..d164428 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -38,7 +37,6 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
@@ -263,9 +261,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                                         final Record<K, V> record) {
         setCurrentNode(child);
 
-        final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
-
-        child.process(record, recordMetadata);
+        child.process(record);
 
         if (child.isTerminalNode()) {
             streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2939525..38daa52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -20,11 +20,10 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
@@ -32,7 +31,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -176,11 +174,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
     }
 
 
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         throwIfClosed();
 
         try {
-            maybeMeasureLatency(() -> processor.process(record, recordMetadata), time, processSensor);
+            maybeMeasureLatency(() -> processor.process(record), time, processSensor);
         } catch (final ClassCastException e) {
             final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
             final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index f8840e4..813bcb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -22,9 +22,6 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-
-import java.util.Optional;
 
 public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
@@ -81,7 +78,7 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
         final KIn key = record.key();
@@ -96,22 +93,13 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
             );
         }
 
-        // Prefer the record metadata if defined,
-        // and fall back to the context (which is undefined and dummy values,
-        // but extractors may still depend on the current behavior.
-        final Optional<ProcessorRecordContext> maybeContext =
-            recordMetadata.map(
-                m -> new ProcessorRecordContext(timestamp, m.offset(), m.partition(), m.topic(), record.headers())
-            );
         final ProcessorRecordContext contextForExtraction =
-            maybeContext.orElseGet(
-                () -> new ProcessorRecordContext(
-                    timestamp,
-                    context.offset(),
-                    context.partition(),
-                    context.topic(),
-                    record.headers()
-                )
+            new ProcessorRecordContext(
+                timestamp,
+                context.offset(),
+                context.partition(),
+                context.topic(),
+                record.headers()
             );
 
         final String topic = topicExtractor.extract(key, value, contextForExtraction);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 711b4c3..7fa8c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -22,11 +22,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
-import java.util.Optional;
-
 public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private InternalProcessorContext context;
@@ -96,7 +93,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
 
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         context.forward(record);
         processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c4e4ff3..464ac9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@@ -56,7 +55,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -692,8 +690,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
-            maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
+            maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
 
             log.trace("Completed processing one record [{}]", record);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 7ff2c6c..38baeb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -78,7 +77,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Executors;
@@ -869,7 +867,7 @@ public class KafkaStreamsTest {
                     }
 
                     @Override
-                    public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                    public void process(final Record<String, String> record) {
                         if (record.value().length() % 2 == 0) {
                             context.forward(record.withValue(record.key() + record.value()));
                         }
@@ -970,7 +968,7 @@ public class KafkaStreamsTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
                     kvStore.put(record.key(), 5L);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index b308b4f..415aaea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -60,7 +59,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 
 import static java.util.Arrays.asList;
@@ -109,7 +107,7 @@ public class StreamsBuilderTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     store.put(record.key(), record.value());
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 9e9f415..ef9becf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -46,7 +45,6 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -379,7 +377,7 @@ public class TopologyTest {
                 }
 
                 @Override
-                public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { }
+                public void process(final Record<Object, Object> record) { }
             };
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 6322fd2..a83c92b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -100,7 +100,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldForwardToSingleChild() {
-        child.process(anyObject(), anyObject());
+        child.process(anyObject());
         expectLastCall();
 
         expect(recordContext.timestamp()).andStubReturn(0L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a4efcbc..8ff9451 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.Optional;
-import java.util.Properties;
-
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -40,6 +37,7 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -208,7 +206,7 @@ public class ProcessorNodeTest {
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
-            () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()), Optional.ofNullable(context.recordContext()))
+            () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()))
         );
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
         assertThat(se.getMessage(), containsString("default Serdes"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 0b7c1b3..07ad044 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -55,7 +54,6 @@ import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.function.Supplier;
@@ -777,7 +775,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record);
         }
     }
@@ -794,7 +792,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record.withTimestamp(record.timestamp() + 10));
         }
     }
@@ -816,7 +814,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record);
             context.forward(record.withTimestamp(record.timestamp() + 5), firstChild);
             context.forward(record, secondChild);
@@ -833,7 +831,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record.withHeaders(record.headers().add(HEADER)));
         }
     }
@@ -851,7 +849,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record.withValue(record.value().split("@")[0]));
         }
     }
@@ -936,7 +934,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             store.put(record.key(), record.value());
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index bc6f08b..c877ab5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -52,7 +52,7 @@ public class SinkNodeTest {
         // When/Then
         context.setTime(-1); // ensures a negative timestamp is set for the record we send next
         try {
-            illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()), java.util.Optional.empty());
+            illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()));
             fail("Should have thrown StreamsException");
         } catch (final StreamsException ignored) {
             // expected
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b54aa6c..e620be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -80,7 +79,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -135,7 +133,7 @@ public class StreamTaskTest {
     private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
         @Override
-        public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<Integer, Integer> record) {
             throw new RuntimeException("KABOOM!");
         }
 
@@ -471,7 +469,7 @@ public class StreamTaskTest {
             }
 
             @Override
-            public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+            public void process(final Record<Integer, Integer> record) {
                 if (record.key() % 2 == 0) {
                     context.forward(record);
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6520778..4ee44ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -64,7 +64,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -94,7 +93,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -1205,7 +1203,7 @@ public class StreamThreadTest {
             "proc",
             () -> new Processor<Object, Object, Object, Object>() {
                 @Override
-                public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<Object, Object> record) {
                     if (shouldThrow.get()) {
                         throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition))));
                     } else {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index bd82e29..6086f97 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -55,7 +55,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
@@ -300,7 +299,7 @@ public class InternalMockProcessorContext
         try {
             for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
                 currentNode = childNode;
-                ((ProcessorNode<K, V, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+                ((ProcessorNode<K, V, ?, ?>) childNode).process(record);
             }
         } finally {
             currentNode = thisNode;
@@ -337,7 +336,7 @@ public class InternalMockProcessorContext
                 if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
                     currentNode = childNode;
                     final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers());
-                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record);
                     toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
                                            // Processors and toInternal might have been modified
                 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index 8bed338..dd56bad 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.time.Duration;
@@ -30,7 +29,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -74,7 +72,7 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         final KIn key = record.key();
         final VIn value = record.value();
         final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 82b24d1..370dca7 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
@@ -84,6 +86,11 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     }
 
     @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.of(recordContext());
+    }
+
+    @Override
     public void setRecordContext(final ProcessorRecordContext recordContext) {
         setRecordMetadata(
             recordContext.topic(),
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 6c653c3..f18b763 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -22,17 +22,14 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     private final MockApiProcessor<K, V, Object, Object> delegate;
-    private InternalProcessorContext internalProcessorContext;
 
     public MockProcessor(final PunctuationType punctuationType,
                          final long scheduleInterval) {
@@ -47,14 +44,12 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     @Override
     public void init(final ProcessorContext context) {
         super.init(context);
-        internalProcessorContext = (InternalProcessorContext) context;
         delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context);
     }
 
     @Override
     public void process(final K key, final V value) {
-        final Record<K, V> record = new Record<>(key, value, context.timestamp(), context.headers());
-        delegate.process(record, Optional.ofNullable(internalProcessorContext.recordContext()));
+        delegate.process(new Record<>(key, value, context.timestamp(), context.headers()));
     }
 
     public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 90fd905..a75c250 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -18,12 +18,10 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Collections;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -61,8 +59,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
-        processor().process(record, recordMetadata);
+    public void process(final Record<KIn, VIn> record) {
+        processor().process(record);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 4c3fed1..9d22e3b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -18,12 +18,10 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
@@ -42,7 +40,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         numReceived++;
         keys.add(record.key());
         values.add(record.value());
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 48783a6..fd9fb76 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -72,7 +72,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -266,14 +265,14 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<Object, Object> record) {
             processedRecords.add(new TTDTestRecord(
                 record.key(),
                 record.value(),
                 record.headers(),
                 record.timestamp(),
-                recordMetadata.map(RecordMetadata::offset).orElse(-1L),
-                recordMetadata.map(RecordMetadata::topic).orElse(null)
+                context.recordMetadata().map(RecordMetadata::offset).orElse(-1L),
+                context.recordMetadata().map(RecordMetadata::topic).orElse(null)
             ));
             context.forward(record);
         }
@@ -408,7 +407,7 @@ public class TopologyTestDriverTest {
                     }
 
                     @Override
-                    public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+                    public void process(final Record<Object, Object> record) {
                         store.put(record.key(), record.value());
                     }
                 }
@@ -1461,7 +1460,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, Long> record) {
             final Long oldValue = store.get(record.key());
             if (oldValue == null || record.value() > oldValue) {
                 store.put(record.key(), record.value());
@@ -1514,7 +1513,7 @@ public class TopologyTestDriverTest {
                         }
 
                         @Override
-                        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+                        public void process(final Record<String, Long> record) {
                             store.put(record.key(), record.value());
                         }
                     };
@@ -1703,7 +1702,7 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     final String value = record.value();
                     if (!value.startsWith("recurse-")) {
                         context.forward(record.withValue("recurse-" + value), "recursiveSink");
@@ -1761,7 +1760,7 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     stateStore.put(record.key(), record.value());
                 }
             }
@@ -1777,7 +1776,7 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     final String value = record.value();
                     if (!value.startsWith("recurse-")) {
                         context.forward(record.withValue("recurse-" + value), "recursiveSink");


[kafka] 01/02: KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-4-record-processor
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 22b9c3ab92dcaac8afae628547500e043e44de87
Author: John Roesler <vv...@apache.org>
AuthorDate: Tue Aug 25 16:51:15 2020 -0500

    KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext
---
 checkstyle/suppressions.xml                        |   2 +-
 .../examples/docs/DeveloperGuideTesting.java       |  20 +-
 .../kafka/streams/processor/ProcessorContext.java  |   5 +-
 .../apache/kafka/streams/processor/StateStore.java |  31 ++-
 .../kafka/streams/processor/StateStoreContext.java | 112 ++++++++++
 .../kafka/streams/processor/api/Processor.java     |  14 +-
 .../streams/processor/api/ProcessorContext.java    |  87 +-------
 .../apache/kafka/streams/processor/api/Record.java | 160 +++++++++++++
 .../ToInternal.java => api/RecordMetadata.java}    |  37 ++-
 .../internals/AbstractProcessorContext.java        |  65 +++---
 .../ForwardingDisabledProcessorContext.java        |   2 +-
 .../internals/GlobalProcessorContextImpl.java      |  30 ++-
 .../internals/GlobalStateManagerImpl.java          |   3 +-
 .../processor/internals/GlobalStateUpdateTask.java |  13 +-
 .../internals/InternalApiProcessorContext.java     | 119 ----------
 .../internals/InternalProcessorContext.java        |   5 +-
 .../processor/internals/ProcessorAdapter.java      |  24 +-
 .../internals/ProcessorContextAdapter.java         | 235 -------------------
 .../processor/internals/ProcessorContextImpl.java  | 138 +++++++-----
 .../internals/ProcessorContextReverseAdapter.java  | 248 ---------------------
 .../streams/processor/internals/ProcessorNode.java |  15 +-
 .../internals/ProcessorRecordContext.java          |   3 +-
 .../processor/internals/ProcessorStateManager.java |   3 +-
 .../streams/processor/internals/SinkNode.java      |  40 +++-
 .../streams/processor/internals/SourceNode.java    |   8 +-
 ...xt.java => StoreToProcessorContextAdapter.java} |  70 +++---
 .../streams/processor/internals/StreamTask.java    |  40 +++-
 .../streams/processor/internals/ToInternal.java    |   4 +
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  15 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |   7 +-
 .../org/apache/kafka/streams/TopologyTest.java     |  10 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   5 +-
 .../kstream/internals/KStreamTransformTest.java    |   5 +-
 .../KTableSuppressProcessorMetricsTest.java        |   3 +-
 .../suppress/KTableSuppressProcessorTest.java      |   3 +-
 .../internals/AbstractProcessorContextTest.java    |  46 ++--
 .../internals/GlobalProcessorContextImplTest.java  |  19 +-
 .../processor/internals/GlobalStateTaskTest.java   |   9 +-
 .../internals/GlobalStreamThreadTest.java          |   4 +-
 .../internals/ProcessorContextImplTest.java        |   2 +-
 .../processor/internals/ProcessorNodeTest.java     |   9 +-
 .../internals/ProcessorStateManagerTest.java       |   5 +-
 .../processor/internals/ProcessorTopologyTest.java |  35 +--
 .../streams/processor/internals/SinkNodeTest.java  |   4 +-
 .../processor/internals/StreamTaskTest.java        |  16 +-
 .../processor/internals/StreamThreadTest.java      |   6 +-
 .../internals/testutil/ConsumerRecordUtil.java     |  46 ++++
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  13 +-
 .../internals/AbstractSessionBytesStoreTest.java   |   9 +-
 .../internals/AbstractWindowBytesStoreTest.java    |  13 +-
 .../state/internals/CachingKeyValueStoreTest.java  |   5 +-
 .../state/internals/CachingSessionStoreTest.java   |   5 +-
 .../state/internals/CachingWindowStoreTest.java    |   5 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   3 +-
 .../ChangeLoggingSessionBytesStoreTest.java        |   6 +-
 ...geLoggingTimestampedKeyValueBytesStoreTest.java |   3 +-
 ...angeLoggingTimestampedWindowBytesStoreTest.java |   6 +-
 .../ChangeLoggingWindowBytesStoreTest.java         |   6 +-
 .../CompositeReadOnlyKeyValueStoreTest.java        |   3 +-
 .../internals/GlobalStateStoreProviderTest.java    |   3 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   5 +-
 .../state/internals/MeteredSessionStoreTest.java   |   5 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |   9 +-
 .../MeteredTimestampedWindowStoreTest.java         |  11 +-
 .../state/internals/MeteredWindowStoreTest.java    |  25 ++-
 .../streams/state/internals/RocksDBStoreTest.java  |  39 ++--
 .../internals/RocksDBTimestampedStoreTest.java     |  15 +-
 .../state/internals/RocksDBWindowStoreTest.java    |   7 +-
 .../state/internals/SegmentIteratorTest.java       |   5 +-
 .../internals/TimeOrderedKeyValueBufferTest.java   |  37 +--
 .../kafka/test/InternalMockProcessorContext.java   |  32 ++-
 .../org/apache/kafka/test/MockApiProcessor.java    |  23 +-
 .../kafka/test/MockInternalProcessorContext.java   |  12 +
 .../java/org/apache/kafka/test/MockProcessor.java  |  11 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |   7 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |  13 +-
 .../apache/kafka/test/NoOpProcessorContext.java    |  15 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  18 +-
 .../streams/processor/MockProcessorContext.java    |   5 +-
 .../kafka/streams/TopologyTestDriverTest.java      | 127 ++++++-----
 80 files changed, 1107 insertions(+), 1176 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dead182..dd074a4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -185,7 +185,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="StreamThreadTest.java"/>
+              files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
 
     <suppress checks="MethodLength"
               files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 90bed05..1ce3445 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.examples.docs;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
@@ -26,6 +27,8 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -37,6 +40,7 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -160,24 +164,24 @@ public class DeveloperGuideTesting {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
-            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
+            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             final KeyValueIterator<String, Long> it = store.all();
             while (it.hasNext()) {
                 final KeyValue<String, Long> next = it.next();
-                context.forward(next.key, next.value);
+                context.forward(new Record<>(next.key, next.value, timestamp, new RecordHeaders()));
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f036869..aafe64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -88,9 +88,12 @@ public interface ProcessorContext {
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index df53ee2..4f47b12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -25,7 +27,7 @@ import org.apache.kafka.streams.errors.StreamsException;
  * all data into this store directory.
  * The store directory must be created with the state directory.
  * The state directory can be obtained via {@link ProcessorContext#stateDir() #stateDir()} using the
- * {@link ProcessorContext} provided via {@link #init(ProcessorContext, StateStore) init(...)}.
+ * {@link ProcessorContext} provided via {@link #init(StateStoreContext, StateStore) init(...)}.
  * <p>
  * Using nested store directories within the state directory isolates different state stores.
  * If a state store would write into the state directory directly, it might conflict with others state stores and thus,
@@ -49,7 +51,28 @@ public interface StateStore {
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the context via the
-     * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(StateStoreContext, StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);
+
+    /**
+     * Initializes this state store.
+     * <p>
+     * The implementation of this function must register the root store in the context via the
+     * {@link StateStoreContext#register(StateStore, StateRestoreCallback)} function, where the
      * first {@link StateStore} parameter should always be the passed-in {@code root} object, and
      * the second parameter should be an object of user's implementation
      * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
@@ -61,7 +84,9 @@ public interface StateStore {
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
 
     /**
      * Flush any cached data
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
new file mode 100644
index 0000000..43810a2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {
+
+    /**
+     * Returns the application id.
+     *
+     * @return the application id
+     */
+    String applicationId();
+
+    /**
+     * Returns the task id.
+     *
+     * @return the task id
+     */
+    TaskId taskId();
+
+    /**
+     * Returns the default key serde.
+     *
+     * @return the key serializer
+     */
+    Serde<?> keySerde();
+
+    /**
+     * Returns the default value serde.
+     *
+     * @return the value serializer
+     */
+    Serde<?> valueSerde();
+
+    /**
+     * Returns the state directory for the partition.
+     *
+     * @return the state directory
+     */
+    File stateDir();
+
+    /**
+     * Returns Metrics instance.
+     *
+     * @return StreamsMetrics
+     */
+    StreamsMetrics metrics();
+
+    /**
+     * Registers and possibly restores the specified storage engine.
+     *
+     * @param store the storage engine
+     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(final StateStore store,
+                  final StateRestoreCallback stateRestoreCallback);
+
+    /**
+     * Returns all the application config properties as key/value pairs.
+     *
+     * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the StateStoreContext.
+     *
+     * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
+     * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
+     * will be of type {@link Class}, even if it was specified as a String to
+     * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
+     *
+     * @return all the key/values from the StreamsConfig properties
+     */
+    Map<String, Object> appConfigs();
+
+    /**
+     * Returns all the application config properties with the given key prefix, as key/value pairs
+     * stripping the prefix.
+     *
+     * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the StateStoreContext.
+     *
+     * @param prefix the properties prefix
+     * @return the key/values matching the given prefix from the StreamsConfig properties.
+     */
+    Map<String, Object> appConfigsWithPrefix(final String prefix);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
index d3656c7..e6feccb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.time.Duration;
+import java.util.Optional;
 
 /**
  * A processor of key-value pair records.
@@ -46,12 +47,17 @@ public interface Processor<KIn, VIn, KOut, VOut> {
     default void init(final ProcessorContext<KOut, VOut> context) {}
 
     /**
-     * Process the record with the given key and value.
+     * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
-     * @param key the key for the record
-     * @param value the value for the record
+     * @param record the record to process
+     * @param recordMetadata the metadata of the record, if it is defined. Note that as long as the processor is
+     *                       receiving a record downstream of a Source (i.e., the current record is coming from an
+     *                       input topic), the metadata is defined. On the other hand, if a parent processor has
+     *                       registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+     *                       punctuator, then there is no record from an input topic, and therefore the metadata
+     *                       would be undefined.
      */
-    void process(KIn key, VIn value);
+    void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);
 
     /**
      * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index 958126a..f4f0fdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -16,22 +16,19 @@
  */
 package org.apache.kafka.streams.processor.api;
 
-import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Processor context interface.
@@ -84,29 +81,20 @@ public interface ProcessorContext<KForward, VForward> {
     StreamsMetrics metrics();
 
     /**
-     * Registers and possibly restores the specified storage engine.
-     *
-     * @param store the storage engine
-     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
-     *
-     * @throws IllegalStateException If store gets registered after initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
-
-    /**
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
      */
     <S extends StateStore> S getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
      * {@link Processor#init(ProcessorContext) initialization} or
-     * {@link Processor#process(Object, Object) processing} to
+     * {@link Processor#process(Record, Optional)}  processing} to
      * schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
      * The type parameter controls what notion of time is used for punctuation:
      * <ul>
@@ -140,23 +128,19 @@ public interface ProcessorContext<KForward, VForward> {
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
      * Requests a commit.
@@ -164,53 +148,6 @@ public interface ProcessorContext<KForward, VForward> {
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the offset
-     */
-    long offset();
-
-    /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the headers
-     */
-    Headers headers();
-
-    /**
-     * Returns the current timestamp.
-     *
-     * <p> If it is triggered while processing a record streamed from the source processor,
-     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
-     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
-     *
-     * <p> If it is triggered while processing a record generated not from the source processor (for example,
-     * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
-     *
-     * @return the timestamp
-     */
-    long timestamp();
-
-    /**
      * Returns all the application config properties as key/value pairs.
      *
      * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
new file mode 100644
index 0000000..3be74f8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);
+    }
+
+    /**
+     * Convenience constructor in case you do not wish to specify any headers.
+     * Subsequent calls to {@link this#headers()} will return a non-null, empty, {@link Headers} collection.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp) {
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+        headers = new RecordHeaders();
+    }
+
+    /**
+     * The key of the record. May be null.
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * The value of the record. May be null.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * The timestamp of the record. Will never be negative.
+     */
+    public long timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * The headers of the record. Never null.
+     */
+    public Headers headers() {
+        return headers;
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the key.
+     *
+     * Copies the attributes of this record with the key replaced.
+     *
+     * @param key The key of the result record. May be null.
+     * @param <NewK> The type of the new record's key.
+     * @return A new Record instance with all the same attributes (except that the key is replaced).
+     */
+    public <NewK> Record<NewK, V> withKey(final NewK key) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the value.
+     *
+     * Copies the attributes of this record with the value replaced.
+     *
+     * @param value The value of the result record.
+     * @param <NewV> The type of the new record's value.
+     * @return A new Record instance with all the same attributes (except that the value is replaced).
+     */
+    public <NewV> Record<K, NewV> withValue(final NewV value) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the timestamp.
+     *
+     * Copies the attributes of this record with the timestamp replaced.
+     *
+     * @param timestamp The timestamp of the result record.
+     * @return A new Record instance with all the same attributes (except that the timestamp is replaced).
+     */
+    public Record<K, V> withTimestamp(final long timestamp) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+
+    /**
+     * A convenient way to produce a new record if you only need to change the headers.
+     *
+     * Copies the attributes of this record with the headers replaced.
+     *
+     * @param headers The headers of the result record.
+     * @return A new Record instance with all the same attributes (except that the headers are replaced).
+     */
+    public Record<K, V> withHeaders(final Headers headers) {
+        return new Record<>(key, value, timestamp, headers);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
similarity index 63%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
index 6c5798e..532104a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
@@ -14,28 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.api;
 
-import org.apache.kafka.streams.processor.To;
+public interface RecordMetadata {
+    /**
+     * @return  The topic of the original record received from Kafka
+     */
+    String topic();
 
-public class ToInternal extends To {
-    public ToInternal() {
-        super(To.all());
-    }
+    /**
+     * @return  The partition of the original record received from Kafka
+     */
+    int partition();
 
-    public void update(final To to) {
-        super.update(to);
-    }
-
-    public boolean hasTimestamp() {
-        return timestamp != -1;
-    }
-
-    public long timestamp() {
-        return timestamp;
-    }
-
-    public String child() {
-        return childName;
-    }
-}
\ No newline at end of file
+    /**
+     * @return  The offset of the original record received from Kafka
+     */
+    long offset();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 6012817..ef222e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -33,7 +34,6 @@ import java.util.Objects;
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
-    public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
@@ -112,64 +112,69 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         stateManager().registerStore(store, stateRestoreCallback);
     }
 
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
     @Override
     public String topic() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
-        }
-
-        final String topic = recordContext.topic();
-
-        if (NONEXIST_TOPIC.equals(topic)) {
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For topic, the dummy value is `null`.
             return null;
+        } else {
+            return recordContext.topic();
         }
-
-        return topic;
     }
 
-    /**
-     * @throws IllegalStateException if partition is null
-     */
     @Override
     public int partition() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For partition, the dummy value is `-1`.
+            return -1;
+        } else {
+            return recordContext.partition();
         }
-
-        return recordContext.partition();
     }
 
-    /**
-     * @throws IllegalStateException if offset is null
-     */
     @Override
     public long offset() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For offset, the dummy value is `-1L`.
+            return -1L;
+        } else {
+            return recordContext.offset();
         }
-        return recordContext.offset();
     }
 
     @Override
     public Headers headers() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For headers, the dummy value is an empty headers collection.
+            return new RecordHeaders();
+        } else {
+            return recordContext.headers();
         }
-        return recordContext.headers();
     }
 
-    /**
-     * @throws IllegalStateException if timestamp is null
-     */
     @Override
     public long timestamp() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For timestamp, the dummy value is `0L`.
+            return 0L;
+        } else {
+            return recordContext.timestamp();
         }
-        return recordContext.timestamp();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 2b8043a..5e654c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -86,7 +86,7 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         return delegate.getStateStore(name);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 695eb77..9f31309 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
-
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
@@ -26,11 +24,16 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+import java.util.Optional;
+
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
@@ -49,26 +52,39 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         return stateManager;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+    public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
+        final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record, recordMetadata);
             }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+        forward(new Record<>(key, value, timestamp(), headers()));
+    }
+
     /**
      * No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2864415..bd3aa3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
@@ -147,7 +148,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+            stateStore.init((StateStoreContext) globalProcessorContext, stateStore);
         }
 
         // make sure each topic-partition from checkpointFileCache is associated with a global state store
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 3664137..360e50e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -22,11 +22,13 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.Record;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
@@ -104,7 +106,16 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                     deserialized.headers());
             processorContext.setRecordContext(recordContext);
             processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+            final Record<Object, Object> toProcess = new Record<>(
+                deserialized.key(),
+                deserialized.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(
+                toProcess,
+                Optional.of(recordContext)
+            );
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
deleted file mode 100644
index 39c3084..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {
-
-    @Override
-    StreamsMetricsImpl metrics();
-
-    /**
-     * @param timeMs current wall-clock system timestamp in milliseconds
-     */
-    void setSystemTimeMs(long timeMs);
-
-    /**
-     * @return the current wall-clock system timestamp in milliseconds
-     */
-    long currentSystemTimeMs();
-
-    /**
-     * Returns the current {@link RecordContext}
-     * @return the current {@link RecordContext}
-     */
-    ProcessorRecordContext recordContext();
-
-    /**
-     * @param recordContext the {@link ProcessorRecordContext} for the record about to be processes
-     */
-    void setRecordContext(ProcessorRecordContext recordContext);
-
-    /**
-     * @param currentNode the current {@link ProcessorNode}
-     */
-    void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
-
-    /**
-     * Get the current {@link ProcessorNode}
-     */
-    ProcessorNode<?, ?, ?, ?> currentNode();
-
-    /**
-     * Get the thread-global cache
-     */
-    ThreadCache cache();
-
-    /**
-     * Mark this context as being initialized
-     */
-    void initialize();
-
-    /**
-     * Mark this context as being uninitialized
-     */
-    void uninitialize();
-
-    /**
-     * @return the type of task (active/standby/global) that this context corresponds to
-     */
-    TaskType taskType();
-
-    /**
-     * Transition to active task and register a new task and cache to this processor context
-     */
-    void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache);
-
-    /**
-     * Transition to standby task and register a dummy cache to this processor context
-     */
-    void transitionToStandby(final ThreadCache newCache);
-
-    /**
-     * Register a dirty entry flush listener for a particular namespace
-     */
-    void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener);
-
-    /**
-     * Get a correctly typed state store, given a handle on the original builder.
-     */
-    @SuppressWarnings("unchecked")
-    default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return (T) getStateStore(builder.name());
-    }
-
-    void logChange(final String storeName,
-                   final Bytes key,
-                   final byte[] value,
-                   final long timestamp);
-
-    String changelogFor(final String storeName);
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 8e4ec25..f4b922b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext extends ProcessorContext {
+public interface InternalProcessorContext
+    extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext {
+
     BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
     ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index d8e4af4..291a99e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -19,9 +19,14 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+
+import java.util.Optional;
 
 public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
     private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
+    private InternalProcessorContext context;
 
     public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
         if (delegate == null) {
@@ -47,12 +52,25 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext<KOut, VOut> context) {
-        delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+        this.context = (InternalProcessorContext) context;
+        delegate.init((org.apache.kafka.streams.processor.ProcessorContext) context);
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        delegate.process(key, value);
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+        final ProcessorRecordContext processorRecordContext = context.recordContext();
+        try {
+            context.setRecordContext(new ProcessorRecordContext(
+                record.timestamp(),
+                context.offset(),
+                context.partition(),
+                context.topic(),
+                record.headers()
+            ));
+            delegate.process(record.key(), record.value());
+        } finally {
+            context.setRecordContext(processorRecordContext);
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
deleted file mode 100644
index 85dbd52..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
-    implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {
-
-    private final InternalProcessorContext delegate;
-
-    @SuppressWarnings("unchecked")
-    public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) {
-        if (delegate instanceof ProcessorContextReverseAdapter) {
-            return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
-        } else {
-            return new ProcessorContextAdapter<>(delegate);
-        }
-    }
-
-    private ProcessorContextAdapter(final InternalProcessorContext delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public String applicationId() {
-        return delegate.applicationId();
-    }
-
-    @Override
-    public TaskId taskId() {
-        return delegate.taskId();
-    }
-
-    @Override
-    public Serde<?> keySerde() {
-        return delegate.keySerde();
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return delegate.valueSerde();
-    }
-
-    @Override
-    public File stateDir() {
-        return delegate.stateDir();
-    }
-
-    @Override
-    public StreamsMetricsImpl metrics() {
-        return delegate.metrics();
-    }
-
-    @Override
-    public void setSystemTimeMs(final long timeMs) {
-        delegate.setSystemTimeMs(timeMs);
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return delegate.currentSystemTimeMs();
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return delegate.recordContext();
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        delegate.setRecordContext(recordContext);
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
-        delegate.setCurrentNode(currentNode);
-    }
-
-    @Override
-    public ProcessorNode<?, ?, ?, ?> currentNode() {
-        return delegate.currentNode();
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return delegate.cache();
-    }
-
-    @Override
-    public void initialize() {
-        delegate.initialize();
-    }
-
-    @Override
-    public void uninitialize() {
-        delegate.uninitialize();
-    }
-
-    @Override
-    public Task.TaskType taskType() {
-        return delegate.taskType();
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
-        delegate.transitionToActive(streamTask, recordCollector, newCache);
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-        delegate.transitionToStandby(newCache);
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
-        delegate.registerCacheFlushListener(namespace, listener);
-    }
-
-    @Override
-    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return delegate.getStateStore(builder);
-    }
-
-    @Override
-    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
-        delegate.logChange(storeName, key, value, timestamp);
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return delegate.changelogFor(storeName);
-    }
-
-    @Override
-    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
-        delegate.register(store, stateRestoreCallback);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <S extends StateStore> S getStateStore(final String name) {
-        return (S) delegate.getStateStore(name);
-    }
-
-    @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
-        return delegate.schedule(interval, type, callback);
-    }
-
-    @Override
-    public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
-        delegate.forward(key, value);
-    }
-
-    @Override
-    public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
-        delegate.forward(key, value, to);
-    }
-
-    @Override
-    public void commit() {
-        delegate.commit();
-    }
-
-    @Override
-    public String topic() {
-        return delegate.topic();
-    }
-
-    @Override
-    public int partition() {
-        return delegate.partition();
-    }
-
-    @Override
-    public long offset() {
-        return delegate.offset();
-    }
-
-    @Override
-    public Headers headers() {
-        return delegate.headers();
-    }
-
-    @Override
-    public long timestamp() {
-        return delegate.timestamp();
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        return delegate.appConfigs();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return delegate.appConfigsWithPrefix(prefix);
-    }
-
-    InternalProcessorContext delegate() {
-        return delegate;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e12dfe1..309b813 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
@@ -30,13 +27,18 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.List;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
@@ -47,9 +49,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     private StreamTask streamTask;
     private RecordCollector collector;
 
-    private final ToInternal toInternal = new ToInternal();
-    private final static To SEND_TO_ALL = To.all();
-
     private final ProcessorStateManager stateManager;
 
     final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new HashMap<>();
@@ -135,8 +134,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
      * @throws StreamsException if an attempt is made to access this state store from an unknown node
      * @throws UnsupportedOperationException if the current streamTask type is standby
      */
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S  getStateStore(final String name) {
         throwUnsupportedOperationExceptionIfStandby("getStateStore");
         if (currentNode() == null) {
             throw new StreamsException("Accessing from an unknown node");
@@ -144,7 +144,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         final StateStore globalStore = stateManager.getGlobalStore(name);
         if (globalStore != null) {
-            return getReadOnlyStore(globalStore);
+            return (S) getReadOnlyStore(globalStore);
         }
 
         if (!currentNode().stateStores.contains(name)) {
@@ -159,84 +159,116 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
-    }
-
-    @Override
-    public <K, V> void forward(final K key,
-                               final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
-    }
-
-    @Override
-    @Deprecated
-    public <K, V> void forward(final K key,
-                               final V value,
-                               final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
-            key,
-            value,
-            To.child((currentNode().children()).get(childIndex).name()));
+        return (S) getReadWriteStore(store);
     }
 
     @Override
-    @Deprecated
-    public <K, V> void forward(final K key,
-                               final V value,
-                               final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final K key,
-                               final V value,
-                               final To to) {
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
         throwUnsupportedOperationExceptionIfStandby("forward");
+
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         final ProcessorRecordContext previousContext = recordContext;
 
         try {
-            toInternal.update(to);
-            if (toInternal.hasTimestamp()) {
+            if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
                 recordContext = new ProcessorRecordContext(
-                    toInternal.timestamp(),
+                    record.timestamp(),
                     recordContext.offset(),
                     recordContext.partition(),
                     recordContext.topic(),
                     recordContext.headers());
             }
 
-            final String sendTo = toInternal.child();
-            if (sendTo == null) {
+            if (childName == null) {
                 final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
                 for (final ProcessorNode<?, ?, ?, ?> child : children) {
-                    forward((ProcessorNode<K, V, ?, ?>) child, key, value);
+                    forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
                 }
             } else {
-                final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(sendTo);
+                final ProcessorNode<?, ?, ?, ?> child = currentNode().getChild(childName);
                 if (child == null) {
-                    throw new StreamsException("Unknown downstream node: " + sendTo
-                        + " either does not exist or is not connected to this processor.");
+                    throw new StreamsException("Unknown downstream node: " + childName
+                                                   + " either does not exist or is not connected to this processor.");
                 }
-                forward((ProcessorNode<K, V, ?, ?>) child, key, value);
+                forwardInternal((ProcessorNode<K, V, ?, ?>) child, record);
             }
+
         } finally {
             recordContext = previousContext;
             setCurrentNode(previousNode);
         }
     }
 
-    private <K, V> void forward(final ProcessorNode<K, V, ?, ?> child,
-                                final K key,
-                                final V value) {
+    @Override
+    public <K, V> void forward(final K key,
+                               final V value) {
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
+    }
+
+    @Override
+    @Deprecated
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final int childIndex) {
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
+    }
+
+    @Override
+    @Deprecated
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final String childName) {
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
+    }
+
+    @Override
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    private <K, V> void forwardInternal(final ProcessorNode<K, V, ?, ?> child,
+                                        final Record<K, V> record) {
         setCurrentNode(child);
-        child.process(key, value);
+
+        final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
+
+        child.process(record, recordMetadata);
+
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());
+            streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
deleted file mode 100644
index 6e82a5e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
-
-    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
-        if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
-        } else {
-            return new ProcessorContextReverseAdapter(delegate);
-        }
-    }
-
-    private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public String applicationId() {
-        return delegate.applicationId();
-    }
-
-    @Override
-    public TaskId taskId() {
-        return delegate.taskId();
-    }
-
-    @Override
-    public Serde<?> keySerde() {
-        return delegate.keySerde();
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return delegate.valueSerde();
-    }
-
-    @Override
-    public File stateDir() {
-        return delegate.stateDir();
-    }
-
-    @Override
-    public StreamsMetricsImpl metrics() {
-        return delegate.metrics();
-    }
-
-    @Override
-    public void setSystemTimeMs(final long timeMs) {
-        delegate.setSystemTimeMs(timeMs);
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return delegate.currentSystemTimeMs();
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return delegate.recordContext();
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        delegate.setRecordContext(recordContext);
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
-        delegate.setCurrentNode(currentNode);
-    }
-
-    @Override
-    public ProcessorNode<?, ?, ?, ?> currentNode() {
-        return delegate.currentNode();
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return delegate.cache();
-    }
-
-    @Override
-    public void initialize() {
-        delegate.initialize();
-    }
-
-    @Override
-    public void uninitialize() {
-        delegate.uninitialize();
-    }
-
-    @Override
-    public Task.TaskType taskType() {
-        return delegate.taskType();
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
-        delegate.transitionToActive(streamTask, recordCollector, newCache);
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-        delegate.transitionToStandby(newCache);
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
-        delegate.registerCacheFlushListener(namespace, listener);
-    }
-
-    @Override
-    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
-        return delegate.getStateStore(builder);
-    }
-
-    @Override
-    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
-        delegate.logChange(storeName, key, value, timestamp);
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return delegate.changelogFor(storeName);
-    }
-
-    @Override
-    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
-        delegate.register(store, stateRestoreCallback);
-    }
-
-    @Override
-    public StateStore getStateStore(final String name) {
-        return delegate.getStateStore(name);
-    }
-
-    @Deprecated
-    @Override
-    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
-        return delegate.schedule(Duration.ofMillis(intervalMs), type, callback);
-    }
-
-    @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
-        return delegate.schedule(interval, type, callback);
-    }
-
-    @Override
-    public <K, V> void forward(final K key, final V value) {
-        delegate.forward(key, value);
-    }
-
-    @Override
-    public <K, V> void forward(final K key, final V value, final To to) {
-        delegate.forward(key, value, to);
-    }
-
-    @Deprecated
-    @Override
-    public <K, V> void forward(final K key, final V value, final int childIndex) {
-        delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
-    }
-
-    @Deprecated
-    @Override
-    public <K, V> void forward(final K key, final V value, final String childName) {
-        delegate.forward(key, value, To.child(childName));
-    }
-
-    @Override
-    public void commit() {
-        delegate.commit();
-    }
-
-    @Override
-    public String topic() {
-        return delegate.topic();
-    }
-
-    @Override
-    public int partition() {
-        return delegate.partition();
-    }
-
-    @Override
-    public long offset() {
-        return delegate.offset();
-    }
-
-    @Override
-    public Headers headers() {
-        return delegate.headers();
-    }
-
-    @Override
-    public long timestamp() {
-        return delegate.timestamp();
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        return delegate.appConfigs();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return delegate.appConfigsWithPrefix(prefix);
-    }
-
-    InternalApiProcessorContext<Object, Object> delegate() {
-        return delegate;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 122a3bc..2939525 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
@@ -29,6 +32,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -104,6 +108,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         childByName.put(child.name, child);
     }
 
+    @SuppressWarnings("unchecked")
     public void init(final InternalProcessorContext context) {
         if (!closed)
             throw new IllegalStateException("The processor is not closed");
@@ -114,7 +119,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init(ProcessorContextAdapter.adapt(context));
+                        processor.init((ProcessorContext<KOut, VOut>) context);
                     }
                 },
                 time,
@@ -171,14 +176,14 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
     }
 
 
-    public void process(final KIn key, final VIn value) {
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
         throwIfClosed();
 
         try {
-            maybeMeasureLatency(() -> processor.process(key, value), time, processSensor);
+            maybeMeasureLatency(() -> processor.process(record, recordMetadata), time, processSensor);
         } catch (final ClassCastException e) {
-            final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
-            final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+            final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
+            final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
             throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's "
                     + "input types match the deserialized types? Check the Serde setup and change the default Serdes in "
                     + "StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept "
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 5dd0062..2e979ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
@@ -29,7 +30,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
-public class ProcessorRecordContext implements RecordContext {
+public class ProcessorRecordContext implements RecordContext, RecordMetadata {
 
     private final long timestamp;
     private final long offset;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index aa69752..4948ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -197,7 +198,7 @@ public class ProcessorStateManager implements StateManager {
             if (stores.containsKey(store.name())) {
                 maybeRegisterStoreWithChangelogReader(store.name());
             } else {
-                store.init(processorContext, store);
+                store.init((StateStoreContext) processorContext, store);
             }
             log.trace("Registered state store {}", store.name());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index a77deb8..f8840e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+
+import java.util.Optional;
 
 public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
@@ -76,19 +80,43 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
         }
     }
 
-
     @Override
-    public void process(final KIn key, final VIn value) {
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
         final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
-        final long timestamp = context.timestamp();
+        final KIn key = record.key();
+        final VIn value = record.value();
+
+        final long timestamp = record.timestamp();
         if (timestamp < 0) {
-            throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
+            throw new StreamsException(
+                "Invalid (negative) timestamp of "
+                    + timestamp
+                    + " for output record <" + key + ":" + value + ">."
+            );
         }
 
-        final String topic = topicExtractor.extract(key, value, this.context.recordContext());
+        // Prefer the record metadata if defined,
+        // and fall back to the context (which is undefined and dummy values,
+        // but extractors may still depend on the current behavior.
+        final Optional<ProcessorRecordContext> maybeContext =
+            recordMetadata.map(
+                m -> new ProcessorRecordContext(timestamp, m.offset(), m.partition(), m.topic(), record.headers())
+            );
+        final ProcessorRecordContext contextForExtraction =
+            maybeContext.orElseGet(
+                () -> new ProcessorRecordContext(
+                    timestamp,
+                    context.offset(),
+                    context.partition(),
+                    context.topic(),
+                    record.headers()
+                )
+            );
+
+        final String topic = topicExtractor.extract(key, value, contextForExtraction);
 
-        collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
+        collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 6dbdfee..711b4c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -21,8 +21,12 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
+import java.util.Optional;
+
 public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private InternalProcessorContext context;
@@ -92,8 +96,8 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
 
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        context.forward(key, value);
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+        context.forward(record);
         processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
similarity index 60%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
index 2b8043a..ae2e42f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
@@ -19,34 +19,33 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
-import java.util.Objects;
 
-/**
- * {@code ProcessorContext} implementation that will throw on any forward call.
- */
-public final class ForwardingDisabledProcessorContext implements ProcessorContext {
-    private final ProcessorContext delegate;
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+    private final StateStoreContext delegate;
 
-    private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
-        + "as the framework must ensure the key is not changed (#forward allows changing the key on "
-        + "messages which are sent). Try another function, which doesn't allow the key to be changed "
-        + "(for example - #tranformValues).";
+    public static ProcessorContext adapt(final StateStoreContext delegate) {
+        if (delegate instanceof ProcessorContext) {
+            return (ProcessorContext) delegate;
+        } else {
+            return new StoreToProcessorContextAdapter(delegate);
+        }
+    }
 
-    public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
-        this.delegate = Objects.requireNonNull(delegate, "delegate");
+    private StoreToProcessorContextAdapter(final StateStoreContext delegate) {
+        this.delegate = delegate;
     }
 
     @Override
@@ -80,81 +79,76 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
     }
 
     @Override
-    public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback) {
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
         delegate.register(store, stateRestoreCallback);
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
-        return delegate.getStateStore(name);
+    public <S extends StateStore> S getStateStore(final String name) {
+        throw new UnsupportedOperationException("StateStores can't access getStateStore.");
     }
 
-    @Override
     @Deprecated
-    public Cancellable schedule(final long intervalMs,
-                                final PunctuationType type,
-                                final Punctuator callback) {
-        return delegate.schedule(intervalMs, type, callback);
+    @Override
+    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+        throw new UnsupportedOperationException("StateStores can't access schedule.");
     }
 
     @Override
-    public Cancellable schedule(final Duration interval,
-                                final PunctuationType type,
-                                final Punctuator callback) throws IllegalArgumentException {
-        return delegate.schedule(interval, type, callback);
+    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+        throw new UnsupportedOperationException("StateStores can't access schedule.");
     }
 
     @Override
     public <K, V> void forward(final K key, final V value) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
-    @Override
     @Deprecated
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
-    @Override
     @Deprecated
+    @Override
     public <K, V> void forward(final K key, final V value, final String childName) {
-        throw new StreamsException(EXPLANATION);
+        throw new UnsupportedOperationException("StateStores can't access forward.");
     }
 
     @Override
     public void commit() {
-        delegate.commit();
+        throw new UnsupportedOperationException("StateStores can't access commit.");
     }
 
     @Override
     public String topic() {
-        return delegate.topic();
+        throw new UnsupportedOperationException("StateStores can't access topic.");
     }
 
     @Override
     public int partition() {
-        return delegate.partition();
+        throw new UnsupportedOperationException("StateStores can't access partition.");
     }
 
     @Override
     public long offset() {
-        return delegate.offset();
+        throw new UnsupportedOperationException("StateStores can't access offset.");
     }
 
     @Override
     public Headers headers() {
-        return delegate.headers();
+        throw new UnsupportedOperationException("StateStores can't access headers.");
     }
 
     @Override
     public long timestamp() {
-        return delegate.timestamp();
+        throw new UnsupportedOperationException("StateStores can't access timestamp.");
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 271724c..c4e4ff3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -36,6 +36,8 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@@ -54,6 +56,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -670,9 +673,27 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
             log.trace("Start processing one record [{}]", record);
 
-            updateProcessorContext(record, currNode, wallClockTime);
+            updateProcessorContext(
+                currNode,
+                wallClockTime,
+                new ProcessorRecordContext(
+                    record.timestamp,
+                    record.offset(),
+                    record.partition(),
+                    record.topic(),
+                    record.headers()
+                )
+            );
+
             maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
-            maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
+            final Record<Object, Object> toProcess = new Record<>(
+                record.key(),
+                record.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
+            maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
 
             log.trace("Completed processing one record [{}]", record);
 
@@ -742,8 +763,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
             throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
         }
 
-        updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
-            timestamp), node, time.milliseconds());
+        updateProcessorContext(node, time.milliseconds(), null);
 
         if (log.isTraceEnabled()) {
             log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
@@ -760,14 +780,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime) {
-        processorContext.setRecordContext(
-            new ProcessorRecordContext(
-                record.timestamp,
-                record.offset(),
-                record.partition(),
-                record.topic(),
-                record.headers()));
+    private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
+                                        final long wallClockTime,
+                                        final ProcessorRecordContext recordContext) {
+        processorContext.setRecordContext(recordContext);
         processorContext.setCurrentNode(currNode);
         processorContext.setSystemTimeMs(wallClockTime);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
index 6c5798e..8865846 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
@@ -23,6 +23,10 @@ public class ToInternal extends To {
         super(To.all());
     }
 
+    public ToInternal(final To to) {
+        super(to);
+    }
+
     public void update(final To to) {
         super.update(to);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 326d277..7ff2c6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -76,6 +78,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Executors;
@@ -866,9 +869,9 @@ public class KafkaStreamsTest {
                     }
 
                     @Override
-                    public void process(final String key, final String value) {
-                        if (value.length() % 2 == 0) {
-                            context.forward(key, key + value);
+                    public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                        if (record.value().length() % 2 == 0) {
+                            context.forward(record.withValue(record.key() + record.value()));
                         }
                     }
                 }, "source")
@@ -967,11 +970,11 @@ public class KafkaStreamsTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
                     final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
-                    kvStore.put(key, 5L);
+                    kvStore.put(record.key(), 5L);
 
-                    context.forward(key, "5");
+                    context.forward(record.withValue("5"));
                     context.commit();
                 }
             }, "source")
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 807c50e..b308b4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -39,6 +39,8 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -58,6 +60,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 import static java.util.Arrays.asList;
@@ -106,8 +109,8 @@ public class StreamsBuilderTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
-                    store.put(key, value);
+                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                    store.put(record.key(), record.value());
                 }
             }
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 57e742b..9e9f415 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -44,6 +46,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -371,15 +374,12 @@ public class TopologyTest {
         public Processor<Object, Object, Object, Object> get() {
             return new Processor<Object, Object, Object, Object>() {
                 @Override
-                public void init(final ProcessorContext context) {
+                public void init(final ProcessorContext<Object, Object> context) {
                     context.getStateStore(STORE_NAME);
                 }
 
                 @Override
-                public void process(final Object key, final Object value) { }
-
-                @Override
-                public void close() { }
+                public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { }
             };
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8a65bc3..ab2adbf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.To;
@@ -131,7 +132,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         }
 
         sessionStore = storeBuilder.build();
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
     }
 
     @After
@@ -640,7 +641,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
                 Serdes.Long())
                 .withLoggingDisabled();
         final SessionStore<String, Long> sessionStore = storeBuilder.build();
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
         return context;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 19eb1d2..08bd5e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -60,7 +61,7 @@ public class KStreamTransformTest {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp)
+                        timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp))
                     );
                 }
 
@@ -126,7 +127,7 @@ public class KStreamTransformTest {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp));
+                        timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp)));
                 }
 
                 @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index dc0b69d..3b46765 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
@@ -243,7 +244,7 @@ public class KTableSuppressProcessorMetricsTest {
         context.setCurrentNode(new ProcessorNode("testNode"));
         context.setSystemTimeMs(time.milliseconds());
 
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         processor.init(context);
 
         final long timestamp = 100L;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 1d1d6fb..778af9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
@@ -85,7 +86,7 @@ public class KTableSuppressProcessorTest {
             final MockInternalProcessorContext context = new MockInternalProcessorContext();
             context.setCurrentNode(new ProcessorNode("testNode"));
 
-            buffer.init(context, buffer);
+            buffer.init((StateStoreContext) context, buffer);
             processor.init(context);
 
             this.processor = processor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index f39d730..4cef063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
@@ -44,6 +45,8 @@ import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 public class AbstractProcessorContextTest {
@@ -81,14 +84,9 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
+    public void shouldReturnNullTopicIfNoRecordContext() {
         context.setRecordContext(null);
-        try {
-            context.topic();
-            fail("should throw illegal state exception when record context is null");
-        } catch (final IllegalStateException e) {
-            // pass
-        }
+        assertThat(context.topic(), is(nullValue()));
     }
 
     @Test
@@ -104,19 +102,14 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfTopicEqualsNonExistTopic() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
+        context.setRecordContext(null);
         assertThat(context.topic(), nullValue());
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() {
+    public void shouldReturnDummyPartitionIfNoRecordContext() {
         context.setRecordContext(null);
-        try {
-            context.partition();
-            fail("should throw illegal state exception when record context is null");
-        } catch (final IllegalStateException e) {
-            // pass
-        }
+        assertThat(context.partition(), is(-1));
     }
 
     @Test
@@ -140,14 +133,9 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() {
+    public void shouldReturnDummyTimestampIfNoRecordContext() {
         context.setRecordContext(null);
-        try {
-            context.timestamp();
-            fail("should throw illegal state exception when record context is null");
-        } catch (final IllegalStateException e) {
-            // pass
-        }
+        assertThat(context.timestamp(), is(0L));
     }
 
     @Test
@@ -161,9 +149,9 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldReturnNullIfHeadersAreNotSet() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
-        assertThat(context.headers(), nullValue());
+    public void shouldReturnEmptyHeadersIfHeadersAreNotSet() {
+        context.setRecordContext(null);
+        assertThat(context.headers(), is(emptyIterable()));
     }
 
     @Test
@@ -211,7 +199,7 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
-        public StateStore getStateStore(final String name) {
+        public <S extends StateStore> S getStateStore(final String name) {
             return null;
         }
 
@@ -231,6 +219,12 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
+        public <K, V> void forward(final Record<K, V> record) {}
+
+        @Override
+        public <K, V> void forward(final Record<K, V> record, final String childName) {}
+
+        @Override
         public <K, V> void forward(final K key, final V value) {}
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index ad8cd0a..6322fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -30,6 +32,7 @@ import org.hamcrest.core.IsInstanceOf;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
@@ -97,11 +100,13 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldForwardToSingleChild() {
-        child.process(null, null);
+        child.process(anyObject(), anyObject());
         expectLastCall();
 
+        expect(recordContext.timestamp()).andStubReturn(0L);
+        expect(recordContext.headers()).andStubReturn(new RecordHeaders());
         replay(child, recordContext);
-        globalContext.forward(null, null);
+        globalContext.forward((Object /*forcing a call to the K/V forward*/) null, null);
         verify(child, recordContext);
     }
 
@@ -142,7 +147,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -151,7 +156,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -160,7 +165,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -169,7 +174,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedWindowStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -178,7 +183,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForSessionStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
-            store.init(null, null);
+            store.init((ProcessorContext) null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 6ce2bae..3cc06be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil.record;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -127,7 +128,7 @@ public class GlobalStateTaskTest {
     @Test
     public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
+        globalStateTask.update(record(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
         assertEquals(1, sourceOne.numReceived);
         assertEquals(0, sourceTwo.numReceived);
     }
@@ -136,7 +137,7 @@ public class GlobalStateTaskTest {
     public void shouldProcessRecordsForOtherTopic() {
         final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic2, 1, 1, integerBytes, integerBytes));
+        globalStateTask.update(record(topic2, 1, 1, integerBytes, integerBytes));
         assertEquals(1, sourceTwo.numReceived);
         assertEquals(0, sourceOne.numReceived);
     }
@@ -215,7 +216,7 @@ public class GlobalStateTaskTest {
         expectedOffsets.put(t1, 52L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.update(record(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.flushState();
         assertEquals(expectedOffsets, stateMgr.changelogOffsets());
     }
@@ -226,7 +227,7 @@ public class GlobalStateTaskTest {
         expectedOffsets.put(t1, 102L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.update(record(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.flushState();
         assertThat(stateMgr.changelogOffsets(), equalTo(expectedOffsets));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 7d5773b..a69bd12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -49,6 +48,7 @@ import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
+import static org.apache.kafka.streams.processor.internals.testutil.ConsumerRecordUtil.record;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -253,7 +253,7 @@ public class GlobalStreamThreadTest {
             "Thread never started.");
 
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 1L));
-        mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
+        mockConsumer.addRecord(record(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
 
         TestUtils.waitForCondition(
             () -> mockConsumer.position(topicPartition) == 1L,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index f4b62c3d..ab88efa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -767,7 +767,7 @@ public class ProcessorContextImplTest {
         assertTrue(store.persistent());
         assertTrue(store.isOpen());
 
-        checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+        checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()");
         checkThrowsUnsupportedOperation(store::close, "close()");
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 0d79ae5..a4efcbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Optional;
 import java.util.Properties;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
@@ -28,6 +31,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -202,7 +206,10 @@ public class ProcessorNodeTest {
         final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
         final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
-        final StreamsException se = assertThrows(StreamsException.class, () -> node.process("aKey", "aValue"));
+        final StreamsException se = assertThrows(
+            StreamsException.class,
+            () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()), Optional.ofNullable(context.recordContext()))
+        );
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
         assertThat(se.getMessage(), containsString("default Serdes"));
         assertThat(se.getMessage(), containsString("input types"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 448c2b1..439cd59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -295,7 +296,7 @@ public class ProcessorStateManagerTest {
         expect(store.name()).andStubReturn(persistentStoreName);
 
         context.uninitialize();
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         replay(storeMetadata, context, store);
 
         stateMgr.registerStateStores(singletonList(store), context);
@@ -325,7 +326,7 @@ public class ProcessorStateManagerTest {
         expect(store.name()).andStubReturn(persistentStoreName);
 
         context.uninitialize();
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         replay(storeMetadata, context, store);
 
         stateMgr.registerStateStores(singletonList(store), context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 77dc6af..0b7c1b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -35,10 +35,11 @@ import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -54,6 +55,7 @@ import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.function.Supplier;
@@ -775,8 +777,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
+        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+            context.forward(record);
         }
     }
 
@@ -792,8 +794,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value, To.all().withTimestamp(context.timestamp() + 10));
+        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+            context.forward(record.withTimestamp(record.timestamp() + 10));
         }
     }
 
@@ -814,11 +816,11 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
-            context.forward(key, value, To.child(firstChild).withTimestamp(context.timestamp() + 5));
-            context.forward(key, value, To.child(secondChild));
-            context.forward(key, value, To.all().withTimestamp(context.timestamp() + 2));
+        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+            context.forward(record);
+            context.forward(record.withTimestamp(record.timestamp() + 5), firstChild);
+            context.forward(record, secondChild);
+            context.forward(record.withTimestamp(record.timestamp() + 2));
         }
     }
 
@@ -831,9 +833,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.headers().add(HEADER);
-            context.forward(key, value);
+        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+            context.forward(record.withHeaders(record.headers().add(HEADER)));
         }
     }
 
@@ -850,8 +851,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value.split("@")[0]);
+        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+            context.forward(record.withValue(record.value().split("@")[0]));
         }
     }
 
@@ -935,8 +936,8 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            store.put(key, value);
+        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+            store.put(record.key(), record.value());
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 6234b0f..bc6f08b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
@@ -50,7 +52,7 @@ public class SinkNodeTest {
         // When/Then
         context.setTime(-1); // ensures a negative timestamp is set for the record we send next
         try {
-            illTypedSink.process("any key".getBytes(), "any value".getBytes());
+            illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()), java.util.Optional.empty());
             fail("Should have thrown StreamsException");
         } catch (final StreamsException ignored) {
             // expected
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d0c5804..b54aa6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -50,6 +50,8 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -78,6 +80,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -132,7 +135,7 @@ public class StreamTaskTest {
     private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
         @Override
-        public void process(final Integer key, final Integer value) {
+        public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
             throw new RuntimeException("KABOOM!");
         }
 
@@ -466,10 +469,11 @@ public class StreamTaskTest {
                 this.context = context;
                 super.init(context);
             }
+
             @Override
-            public void process(final Integer key, final Integer value) {
-                if (key % 2 == 0) {
-                    context.forward(key, value);
+            public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+                if (record.key() % 2 == 0) {
+                    context.forward(record);
                 }
             }
         };
@@ -1230,10 +1234,10 @@ public class StreamTaskTest {
         task.completeRestoration();
 
         task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            task.processorContext().recordContext().headers().add("dummy", (byte[]) null);
+            task.processorContext().headers().add("dummy", (byte[]) null);
         });
         task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            assertFalse(task.processorContext().recordContext().headers().iterator().hasNext());
+            assertFalse(task.processorContext().headers().iterator().hasNext());
         });
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 575802e..6520778 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -63,6 +63,8 @@ import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -92,6 +94,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -1201,9 +1204,8 @@ public class StreamThreadTest {
         internalTopologyBuilder.addProcessor(
             "proc",
             () -> new Processor<Object, Object, Object, Object>() {
-
                 @Override
-                public void process(final Object key, final Object value) {
+                public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
                     if (shouldThrow.get()) {
                         throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition))));
                     } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
new file mode 100644
index 0000000..a702fdc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+public final class ConsumerRecordUtil {
+    private ConsumerRecordUtil() {}
+
+    public static <K, V> ConsumerRecord<K, V> record(final String topic,
+                                                     final int partition,
+                                                     final long offset,
+                                                     final K key,
+                                                     final V value) {
+        // the no-time constructor in ConsumerRecord initializes the
+        // timestamp to -1, which is an invalid configuration. Here,
+        // we initialize it to 0.
+        return new ConsumerRecord<>(
+            topic,
+            partition,
+            offset,
+            0L,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            key,
+            value
+        );
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 4b3f9d5..61d27b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -130,7 +131,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
             new MockRecordCollector(),
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
         );
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
     }
 
     @After
@@ -287,7 +288,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         bytesStore = getBytesStore();
 
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
         assertThat(
             results,
@@ -317,7 +318,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         bytesStore = getBytesStore();
 
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
         assertThat(
             results,
@@ -336,7 +337,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         // need to create a segment so we can attempt to write to it again.
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
         bytesStore.close();
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
     }
 
@@ -365,7 +366,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
     }
 
     private void shouldRestoreToByteStore(final TaskType taskType) {
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
         // 0 segments initially.
         assertEquals(0, bytesStore.getSegments().size());
         final String key = "a";
@@ -405,7 +406,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         );
         final Time time = new SystemTime();
         context.setSystemTimeMs(time.milliseconds());
-        bytesStore.init(context, bytesStore);
+        bytesStore.init((StateStoreContext) context, bytesStore);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
             // write a record to advance stream time, with a high enough timestamp
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index ce3aa86..bb425a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -98,7 +99,7 @@ public abstract class AbstractSessionBytesStoreTest {
                 new MockStreamsMetrics(new Metrics())));
         context.setTime(1L);
 
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
     }
 
     @After
@@ -263,7 +264,7 @@ public abstract class AbstractSessionBytesStoreTest {
     @Test
     public void shouldFetchExactKeys() {
         sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long());
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
 
         sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
         sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
@@ -302,7 +303,7 @@ public abstract class AbstractSessionBytesStoreTest {
         final SessionStore<Bytes, String> sessionStore =
             buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String());
 
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
 
         final Bytes key1 = Bytes.wrap(new byte[] {0});
         final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
@@ -427,7 +428,7 @@ public abstract class AbstractSessionBytesStoreTest {
         final Time time = new SystemTime();
         context.setTime(1L);
         context.setSystemTimeMs(time.milliseconds());
-        sessionStore.init(context, sessionStore);
+        sessionStore.init((StateStoreContext) context, sessionStore);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
             // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index a2c6b7a..6aeb28d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -111,7 +112,7 @@ public abstract class AbstractWindowBytesStoreTest {
                 new MockStreamsMetrics(new Metrics())));
         context.setTime(1L);
 
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
     }
 
     @After
@@ -713,7 +714,7 @@ public abstract class AbstractWindowBytesStoreTest {
     @SuppressWarnings("deprecation")
     public void testPutSameKeyTimestamp() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         final long startTime = SEGMENT_INTERVAL - 4L;
 
@@ -797,7 +798,7 @@ public abstract class AbstractWindowBytesStoreTest {
             Serdes.String(),
             Serdes.String());
 
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         windowStore.put("a", "0001", 0);
         windowStore.put("aa", "0002", 0);
@@ -882,7 +883,7 @@ public abstract class AbstractWindowBytesStoreTest {
             true,
             Serdes.Bytes(),
             Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         final Bytes key1 = Bytes.wrap(new byte[] {0});
         final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
@@ -970,7 +971,7 @@ public abstract class AbstractWindowBytesStoreTest {
         final Time time = new SystemTime();
         context.setSystemTimeMs(time.milliseconds());
         context.setTime(1L);
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
             // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
@@ -1111,7 +1112,7 @@ public abstract class AbstractWindowBytesStoreTest {
     @SuppressWarnings("deprecation")
     public void testFetchDuplicates() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         long currentTime = 0;
         setCurrentTime(currentTime);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index d0e10d5..98f0ba6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -77,7 +78,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(null, null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        store.init(context, null);
+        store.init((StateStoreContext) context, null);
     }
 
     @After
@@ -178,7 +179,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         cache = EasyMock.niceMock(ThreadCache.class);
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index d8e97b8..f36629d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -90,7 +91,7 @@ public class CachingSessionStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     @After
@@ -186,7 +187,7 @@ public class CachingSessionStoreTest {
         cache = EasyMock.niceMock(ThreadCache.class);
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 6c4ddf6..42b750b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +105,7 @@ public class CachingWindowStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     @After
@@ -886,7 +887,7 @@ public class CachingWindowStoreTest {
         cache = EasyMock.createNiceMock(ThreadCache.class);
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
-        cachingStore.init(context, cachingStore);
+        cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
     private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String key, final String value, final long timestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index d2d1d73..9106580 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
@@ -55,7 +56,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 5ab035c..426a334 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.SessionStore;
@@ -55,11 +57,11 @@ public class ChangeLoggingSessionBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index f630abb..e05b171 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -60,7 +61,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 4a240b1..9de2207 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -57,11 +59,11 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 0728a1e..c877ac6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -55,11 +57,11 @@ public class ChangeLoggingWindowBytesStoreTest {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);
         EasyMock.expectLastCall();
         EasyMock.replay(inner, context);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 27dcff4..736721a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -79,7 +80,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
             Serdes.String(), Serdes.String()), new MockRecordCollector());
         context.setTime(1L);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         return store;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 62059f1..fd427e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -100,7 +101,7 @@ public class GlobalStateStoreProviderTest {
         expect(mockContext.recordCollector()).andStubReturn(null);
         replay(mockContext);
         for (final StateStore store : stores.values()) {
-            store.init(mockContext, null);
+            store.init((StateStoreContext) mockContext, null);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f639255..83fffef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -154,7 +155,7 @@ public class MeteredKeyValueStoreTest {
 
     private void init() {
         replay(inner, context);
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
     }
 
     @Test
@@ -190,7 +191,7 @@ public class MeteredKeyValueStoreTest {
             keySerde,
             valueSerde
         );
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
 
         metered.get(KEY);
         metered.put(KEY, VALUE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 28efbf9..0ff822e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -158,7 +159,7 @@ public class MeteredSessionStoreTest {
 
     private void init() {
         replay(innerStore, context);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
     }
 
     @Test
@@ -195,7 +196,7 @@ public class MeteredSessionStoreTest {
             valueSerde,
             new MockTime()
         );
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP);
         store.put(WINDOWED_KEY, VALUE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index c03ffef..3d28266 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -158,7 +159,7 @@ public class MeteredTimestampedKeyValueStoreTest {
 
     private void init() {
         replay(inner, context);
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
     }
 
     @Test
@@ -194,7 +195,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             keySerde,
             valueSerde
         );
-        metered.init(context, metered);
+        metered.init((StateStoreContext) context, metered);
 
         metered.get(KEY);
         metered.put(KEY, VALUE_AND_TIMESTAMP);
@@ -430,7 +431,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             null
         );
         replay(inner, context);
-        store.init(context, inner);
+        store.init((StateStoreContext) context, inner);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
@@ -455,7 +456,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             new ValueAndTimestampSerde<>(Serdes.Long())
         );
         replay(inner, context);
-        store.init(context, inner);
+        store.init((StateStoreContext) context, inner);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 18be067..9a9d763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -129,7 +130,7 @@ public class MeteredTimestampedWindowStoreTest {
             keySerde,
             valueSerde
         );
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.fetch(KEY, TIMESTAMP);
         store.put(KEY, VALUE_AND_TIMESTAMP, TIMESTAMP);
@@ -143,7 +144,7 @@ public class MeteredTimestampedWindowStoreTest {
         EasyMock.expectLastCall();
         EasyMock.replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.close();
         EasyMock.verify(innerStoreMock);
     }
@@ -153,7 +154,7 @@ public class MeteredTimestampedWindowStoreTest {
         EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
         EasyMock.replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         assertNull(store.fetch("a", 0));
     }
 
@@ -170,7 +171,7 @@ public class MeteredTimestampedWindowStoreTest {
             null,
             null
         );
-        store.init(context, innerStoreMock);
+        store.init((StateStoreContext) context, innerStoreMock);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
@@ -195,7 +196,7 @@ public class MeteredTimestampedWindowStoreTest {
             Serdes.String(),
             new ValueAndTimestampSerde<>(Serdes.Long())
         );
-        store.init(context, innerStoreMock);
+        store.init((StateStoreContext) context, innerStoreMock);
 
         try {
             store.put("key", ValueAndTimestamp.make(42L, 60000));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 8671521..7301694 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -184,7 +185,7 @@ public class MeteredWindowStoreTest {
             keySerde,
             valueSerde
         );
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.fetch(KEY, TIMESTAMP);
         store.put(KEY, VALUE, TIMESTAMP);
@@ -195,7 +196,7 @@ public class MeteredWindowStoreTest {
     @Test
     public void testMetrics() {
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         final JmxReporter reporter = new JmxReporter();
         final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
         reporter.contextChange(metricsContext);
@@ -225,10 +226,10 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordRestoreLatencyOnInit() {
-        innerStoreMock.init(context, store);
+        innerStoreMock.init((StateStoreContext) context, store);
         expectLastCall();
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
             assertEquals(1.0, getMetricByNameFilterByTags(
@@ -254,7 +255,7 @@ public class MeteredWindowStoreTest {
         expectLastCall();
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.put("a", "a");
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -279,7 +280,7 @@ public class MeteredWindowStoreTest {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -304,7 +305,7 @@ public class MeteredWindowStoreTest {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -330,7 +331,7 @@ public class MeteredWindowStoreTest {
         expectLastCall();
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         store.flush();
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -355,7 +356,7 @@ public class MeteredWindowStoreTest {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
         replay(innerStoreMock);
 
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
         assertNull(store.fetch("a", 0));
     }
 
@@ -393,7 +394,7 @@ public class MeteredWindowStoreTest {
         innerStoreMock.close();
         expectLastCall();
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         store.close();
         verify(innerStoreMock);
@@ -404,7 +405,7 @@ public class MeteredWindowStoreTest {
         innerStoreMock.close();
         expectLastCall();
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         assertThat(storeMetrics(), not(empty()));
         store.close();
@@ -417,7 +418,7 @@ public class MeteredWindowStoreTest {
         innerStoreMock.close();
         expectLastCall().andThrow(new RuntimeException("Oops!"));
         replay(innerStoreMock);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);
 
         // There's always a "count" metric registered
         assertThat(storeMetrics(), not(empty()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index ca28181..dead979 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -274,7 +275,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8));
         rocksDBStore.flush();
 
@@ -297,7 +298,7 @@ public class RocksDBStoreTest {
     public void shouldCallRocksDbConfigSetter() {
         MockRocksDbConfigSetter.called = false;
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
@@ -325,7 +326,7 @@ public class RocksDBStoreTest {
             new Bytes(stringSerializer.serialize(null, "3")),
             stringSerializer.serialize(null, "c")));
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         rocksDBStore.putAll(entries);
         rocksDBStore.flush();
 
@@ -350,7 +351,7 @@ public class RocksDBStoreTest {
     public void shouldRestoreAll() {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
         assertEquals(
@@ -372,7 +373,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldPutOnlyIfAbsentValue() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
         final byte[] valueBytes = stringSerializer.serialize(null, "A");
         final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B");
@@ -389,7 +390,7 @@ public class RocksDBStoreTest {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
         entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
         final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
@@ -413,7 +414,7 @@ public class RocksDBStoreTest {
         // this will restore key "1" as WriteBatch applies updates in order
         entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8)));
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
 
         final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
@@ -446,7 +447,7 @@ public class RocksDBStoreTest {
     public void shouldRestoreThenDeleteOnRestoreAll() {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         context.restore(rocksDBStore.name(), entries);
 
@@ -486,7 +487,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPut() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
@@ -494,7 +495,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPutAll() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
@@ -502,7 +503,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullGet() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.get(null));
@@ -510,7 +511,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnDelete() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.delete(null));
@@ -518,7 +519,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnRange() {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         assertThrows(
             NullPointerException.class,
             () -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))));
@@ -526,7 +527,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         Utils.delete(dir);
         rocksDBStore.put(
             new Bytes(stringSerializer.serialize(null, "anyKey")),
@@ -545,7 +546,7 @@ public class RocksDBStoreTest {
             new StreamsConfig(props));
 
         enableBloomFilters = false;
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         final List<String> expectedValues = new ArrayList<>();
         expectedValues.add("a");
@@ -570,7 +571,7 @@ public class RocksDBStoreTest {
         // reopen with Bloom Filters enabled
         // should open fine without errors
         enableBloomFilters = true;
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
             final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key));
@@ -596,7 +597,7 @@ public class RocksDBStoreTest {
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         final byte[] key = "hello".getBytes();
         final byte[] value = "world".getBytes();
         rocksDBStore.put(Bytes.wrap(key), value);
@@ -628,7 +629,7 @@ public class RocksDBStoreTest {
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         final byte[] key = "hello".getBytes();
         final byte[] value = "world".getBytes();
         rocksDBStore.put(Bytes.wrap(key), value);
@@ -659,7 +660,7 @@ public class RocksDBStoreTest {
         EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
 
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
         final List<String> propertyNames = Arrays.asList(
             "num-entries-active-mem-table",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 042039c..75b4e80 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.hamcrest.core.IsNull;
@@ -49,7 +50,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
     @Test
     public void shouldOpenNewStoreInRegularMode() {
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         }
@@ -62,13 +63,13 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
     @Test
     public void shouldOpenExistingStoreInRegularMode() throws Exception {
         // prepare store
-        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
         rocksDBStore.put(new Bytes("key".getBytes()), "timestamped".getBytes());
         rocksDBStore.close();
 
         // re-open store
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         } finally {
@@ -121,7 +122,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
         prepareOldStore();
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
         }
@@ -404,7 +405,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
         // check that still in upgrade mode
         try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
         } finally {
@@ -438,7 +439,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
         // check that still in regular mode
         try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBTimestampedStore.class)) {
-            rocksDBStore.init(context, rocksDBStore);
+            rocksDBStore.init((StateStoreContext) context, rocksDBStore);
 
             assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         }
@@ -447,7 +448,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
     private void prepareOldStore() {
         final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
         try {
-            keyValueStore.init(context, keyValueStore);
+            keyValueStore.init((StateStoreContext) context, keyValueStore);
 
             keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
             keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c7021fd..9e9b0b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -389,7 +390,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
 
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
             Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         context.setTime(0L);
         setCurrentTime(0);
@@ -479,7 +480,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
         windowStore.close();
 
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         // put something in the store to advance its stream time and expire the old segments
         windowStore.put(1, "v", 6L * SEGMENT_INTERVAL);
@@ -546,7 +547,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                                        false,
                                        Serdes.Integer(),
                                        Serdes.String());
-        windowStore.init(context, windowStore);
+        windowStore.init((StateStoreContext) context, windowStore);
 
         assertEquals(
             new HashSet<>(Collections.emptyList()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 7df1404..97593e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -63,8 +64,8 @@ public class SegmentIteratorTest {
                 new LogContext("testCache "),
                 0,
                 new MockStreamsMetrics(new Metrics())));
-        segmentOne.init(context, segmentOne);
-        segmentTwo.init(context, segmentTwo);
+        segmentOne.init((StateStoreContext) context, segmentOne);
+        segmentTwo.init((StateStoreContext) context, segmentTwo);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
         segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
         segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index a054ac9..0037fbb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -123,7 +124,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldInit() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         cleanup(context, buffer);
     }
 
@@ -131,7 +132,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldAcceptData() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
         cleanup(context, buffer);
     }
@@ -140,7 +141,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRejectNullValues() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         try {
             buffer.put(0, "asdf", null, getContext(0));
             fail("expected an exception");
@@ -154,7 +155,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRemoveData() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
         assertThat(buffer.numRecords(), is(1));
         buffer.evictWhile(() -> true, kv -> { });
@@ -166,7 +167,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRespectEvictionPredicate() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
         putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
         assertThat(buffer.numRecords(), is(2));
@@ -183,7 +184,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldTrackCount() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "oin");
         assertThat(buffer.numRecords(), is(1));
         putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
@@ -197,7 +198,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldTrackSize() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
         assertThat(buffer.bufferSize(), is(43L));
         putRecord(buffer, context, 1L, 0L, "asdf", "3l");
@@ -211,7 +212,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldTrackMinTimestamp() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
         assertThat(buffer.minTimestamp(), is(1L));
         putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
@@ -223,7 +224,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
@@ -269,7 +270,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         assertThat(buffer.priorValueForBuffered("ASDF"), is(Maybe.undefined()));
     }
@@ -278,7 +279,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldReturnPriorValueForBufferedKey() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final ProcessorRecordContext recordContext = getContext(0L);
         context.setRecordContext(recordContext);
@@ -292,7 +293,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldFlush() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
         putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
         putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
         putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
@@ -363,7 +364,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreOldUnversionedFormat() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -477,7 +478,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV1Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -598,7 +599,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV2Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -721,7 +722,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         // Note the data is the same as the V3 test.
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -841,7 +842,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV3Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
@@ -961,7 +962,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     public void shouldNotRestoreUnrecognizedVersionRecord() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
+        buffer.init((StateStoreContext) context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 51b9231..bd82e29 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -54,6 +55,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
@@ -261,9 +263,10 @@ public class InternalMockProcessorContext
         stateManager().registerStore(store, func);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
-        return storeMap.get(name);
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) storeMap.get(name);
     }
 
     @Override
@@ -283,6 +286,28 @@ public class InternalMockProcessorContext
     public void commit() {}
 
     @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
+            setTime(record.timestamp());
+        }
+        final ProcessorNode<?, ?, ?, ?> thisNode = currentNode;
+        try {
+            for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
+                currentNode = childNode;
+                ((ProcessorNode<K, V, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+            }
+        } finally {
+            currentNode = thisNode;
+        }
+    }
+
+    @Override
     public void forward(final Object key, final Object value) {
         forward(key, value, To.all());
     }
@@ -311,7 +336,8 @@ public class InternalMockProcessorContext
             for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
                 if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
                     currentNode = childNode;
-                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(key, value);
+                    final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers());
+                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
                     toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
                                            // Processors and toInternal might have been modified
                 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index 262aecc..8bed338 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.time.Duration;
@@ -28,6 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -65,25 +68,19 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
             scheduleCancellable = context.schedule(
                 Duration.ofMillis(scheduleInterval),
                 punctuationType,
-                timestamp -> {
-                    if (punctuationType == PunctuationType.STREAM_TIME) {
-                        assertThat(context.timestamp(), is(timestamp));
-                    }
-                    assertThat(context.partition(), is(-1));
-                    assertThat(context.offset(), is(-1L));
-
-                    (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
-                        .add(timestamp);
-                });
+                (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)::add
+            );
         }
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, context.timestamp());
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+        final KIn key = record.key();
+        final VIn value = record.value();
+        final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
 
         if (value != null) {
-            lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context.timestamp()));
+            lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, record.timestamp()));
         } else {
             lastValueAndTimestampPerKey.remove(key);
         }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 58b90c1..82b24d1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -21,6 +21,8 @@ import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -67,6 +69,16 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     }
 
     @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record.key(), record.value(), To.all().withTimestamp(record.timestamp()));
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        forward(record.key(), record.value(), To.child(childName).withTimestamp(record.timestamp()));
+    }
+
+    @Override
     public ProcessorRecordContext recordContext() {
         return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 17391bc..6c653c3 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -21,16 +21,18 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextAdapter;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     private final MockApiProcessor<K, V, Object, Object> delegate;
+    private InternalProcessorContext internalProcessorContext;
 
     public MockProcessor(final PunctuationType punctuationType,
                          final long scheduleInterval) {
@@ -41,15 +43,18 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
         delegate = new MockApiProcessor<>();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context) {
         super.init(context);
-        delegate.init(ProcessorContextAdapter.adapt((InternalProcessorContext) context));
+        internalProcessorContext = (InternalProcessorContext) context;
+        delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context);
     }
 
     @Override
     public void process(final K key, final V value) {
-        delegate.process(key, value);
+        final Record<K, V> record = new Record<>(key, value, context.timestamp(), context.headers());
+        delegate.process(record, Optional.ofNullable(internalProcessorContext.recordContext()));
     }
 
     public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 9a8b407..90fd905 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,10 +17,13 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -58,8 +61,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        processor().process(key, value);
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+        processor().process(record, recordMetadata);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 5130d46..4c3fed1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -17,10 +17,13 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
@@ -39,10 +42,10 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     }
 
     @Override
-    public void process(final KIn key, final VIn value) {
-        this.numReceived++;
-        this.keys.add(key);
-        this.values.add(value);
+    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+        numReceived++;
+        keys.add(record.key());
+        values.add(record.value());
     }
 
     @Override
@@ -54,6 +57,6 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     @Override
     public void close() {
         super.close();
-        this.closed = true;
+        closed = true;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index b2b62ee..374f8ec 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 
@@ -65,7 +66,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         return null;
     }
 
@@ -85,13 +86,23 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record.key(), record.value());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        forward(record.key(), record.value());
+    }
+
+    @Override
     public <K, V> void forward(final K key, final V value) {
         forwardedValues.put(key, value);
     }
 
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        forwardedValues.put(key, value);
+        forward(key, value);
     }
 
     @Override
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 54b09de..1b17f67 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -59,7 +58,6 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -444,13 +442,7 @@ public class TopologyTestDriver implements Closeable {
                 new LogAndContinueExceptionHandler()
             );
             globalStateTask.initialize();
-            globalProcessorContext.setRecordContext(new ProcessorRecordContext(
-                0L,
-                -1L,
-                -1,
-                ProcessorContextImpl.NONEXIST_TOPIC,
-                new RecordHeaders())
-            );
+            globalProcessorContext.setRecordContext(null);
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -510,13 +502,7 @@ public class TopologyTestDriver implements Closeable {
             );
             task.initializeIfNeeded();
             task.completeRestoration();
-            task.processorContext().setRecordContext(new ProcessorRecordContext(
-                0L,
-                -1L,
-                -1,
-                ProcessorContextImpl.NONEXIST_TOPIC,
-                new RecordHeaders())
-            );
+            task.processorContext().setRecordContext(null);
         } else {
             task = null;
         }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 8e546e0..88e1660 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -401,9 +401,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         stateStores.put(store.name(), store);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
-        return stateStores.get(name);
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
     }
 
     @Override
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7ae43ef..48783a6 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -41,10 +41,11 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -71,6 +72,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +150,7 @@ public class TopologyTestDriverTest {
         }
     }
 
-    private final static class Record {
+    private final static class TTDTestRecord {
         private final Object key;
         private final Object value;
         private final long timestamp;
@@ -156,8 +158,8 @@ public class TopologyTestDriverTest {
         private final String topic;
         private final Headers headers;
 
-        Record(final ConsumerRecord<byte[], byte[]> consumerRecord,
-               final long newOffset) {
+        TTDTestRecord(final ConsumerRecord<byte[], byte[]> consumerRecord,
+                      final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
             timestamp = consumerRecord.timestamp();
@@ -166,9 +168,9 @@ public class TopologyTestDriverTest {
             headers = consumerRecord.headers();
         }
 
-        Record(final String newTopic,
-               final TestRecord<byte[], byte[]> consumerRecord,
-               final long newOffset) {
+        TTDTestRecord(final String newTopic,
+                      final TestRecord<byte[], byte[]> consumerRecord,
+                      final long newOffset) {
             key = consumerRecord.key();
             value = consumerRecord.value();
             timestamp = consumerRecord.timestamp();
@@ -177,12 +179,12 @@ public class TopologyTestDriverTest {
             headers = consumerRecord.headers();
         }
 
-        Record(final Object key,
-               final Object value,
-               final Headers headers,
-               final long timestamp,
-               final long offset,
-               final String topic) {
+        TTDTestRecord(final Object key,
+                      final Object value,
+                      final Headers headers,
+                      final long timestamp,
+                      final long offset,
+                      final String topic) {
             this.key = key;
             this.value = value;
             this.headers = headers;
@@ -204,7 +206,7 @@ public class TopologyTestDriverTest {
             if (o == null || getClass() != o.getClass()) {
                 return false;
             }
-            final Record record = (Record) o;
+            final TTDTestRecord record = (TTDTestRecord) o;
             return timestamp == record.timestamp &&
                 offset == record.offset &&
                 Objects.equals(key, record.key) &&
@@ -248,7 +250,7 @@ public class TopologyTestDriverTest {
 
         private boolean initialized = false;
         private boolean closed = false;
-        private final List<Record> processedRecords = new ArrayList<>();
+        private final List<TTDTestRecord> processedRecords = new ArrayList<>();
 
         MockProcessor(final Collection<Punctuation> punctuations) {
             this.punctuations = punctuations;
@@ -264,9 +266,16 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(final Object key, final Object value) {
-            processedRecords.add(new Record(key, value, context.headers(), context.timestamp(), context.offset(), context.topic()));
-            context.forward(key, value);
+        public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+            processedRecords.add(new TTDTestRecord(
+                record.key(),
+                record.value(),
+                record.headers(),
+                record.timestamp(),
+                recordMetadata.map(RecordMetadata::offset).orElse(-1L),
+                recordMetadata.map(RecordMetadata::topic).orElse(null)
+            ));
+            context.forward(record);
         }
 
         @Override
@@ -399,8 +408,8 @@ public class TopologyTestDriverTest {
                     }
 
                     @Override
-                    public void process(final Object key, final Object value) {
-                        store.put(key, value);
+                    public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+                        store.put(record.key(), record.value());
                     }
                 }
             );
@@ -578,11 +587,11 @@ public class TopologyTestDriverTest {
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
-        final List<Record> processedRecords = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords = mockProcessors.get(0).processedRecords;
         assertEquals(1, processedRecords.size());
 
-        final Record record = processedRecords.get(0);
-        final Record expectedResult = new Record(SOURCE_TOPIC_1, testRecord1, 0L);
+        final TTDTestRecord record = processedRecords.get(0);
+        final TTDTestRecord expectedResult = new TTDTestRecord(SOURCE_TOPIC_1, testRecord1, 0L);
 
         assertThat(record, equalTo(expectedResult));
     }
@@ -598,16 +607,16 @@ public class TopologyTestDriverTest {
     public void shouldSendRecordViaCorrectSourceTopicDeprecated() {
         testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
 
-        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
-        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+        final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
 
         testDriver.pipeInput(consumerRecord1);
 
         assertEquals(1, processedRecords1.size());
         assertEquals(0, processedRecords2.size());
 
-        Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1, 0L);
+        TTDTestRecord record = processedRecords1.get(0);
+        TTDTestRecord expectedResult = new TTDTestRecord(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         testDriver.pipeInput(consumerRecord2);
@@ -616,7 +625,7 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords2.size());
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2, 0L);
+        expectedResult = new TTDTestRecord(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -841,8 +850,8 @@ public class TopologyTestDriverTest {
     public void shouldProcessConsumerRecordList() {
         testDriver = new TopologyTestDriver(setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), config);
 
-        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
-        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+        final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
 
         final List<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<>(2);
         testRecords.add(consumerRecord1);
@@ -853,12 +862,12 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords1.size());
         assertEquals(1, processedRecords2.size());
 
-        Record record = processedRecords1.get(0);
-        Record expectedResult = new Record(consumerRecord1, 0L);
+        TTDTestRecord record = processedRecords1.get(0);
+        TTDTestRecord expectedResult = new TTDTestRecord(consumerRecord1, 0L);
         assertThat(record, equalTo(expectedResult));
 
         record = processedRecords2.get(0);
-        expectedResult = new Record(consumerRecord2, 0L);
+        expectedResult = new TTDTestRecord(consumerRecord2, 0L);
         assertThat(record, equalTo(expectedResult));
     }
 
@@ -1446,24 +1455,24 @@ public class TopologyTestDriverTest {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
-            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, timestamp -> flushStore());
+            context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             try (final KeyValueIterator<String, Long> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<String, Long> next = it.next();
-                    context.forward(next.key, next.value);
+                    context.forward(new Record<>(next.key, next.value, timestamp, new RecordHeaders()));
                 }
             }
         }
@@ -1505,8 +1514,8 @@ public class TopologyTestDriverTest {
                         }
 
                         @Override
-                        public void process(final String key, final Long value) {
-                            store.put(key, value);
+                        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+                            store.put(record.key(), record.value());
                         }
                     };
                 }
@@ -1589,16 +1598,16 @@ public class TopologyTestDriverTest {
 
         testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config);
 
-        final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords;
-        final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords;
+        final List<TTDTestRecord> processedRecords1 = mockProcessors.get(0).processedRecords;
+        final List<TTDTestRecord> processedRecords2 = mockProcessors.get(1).processedRecords;
 
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
         assertEquals(1, processedRecords1.size());
         assertEquals(0, processedRecords2.size());
 
-        final Record record1 = processedRecords1.get(0);
-        final Record expectedResult1 = new Record(SOURCE_TOPIC_1, testRecord1, 0L);
+        final TTDTestRecord record1 = processedRecords1.get(0);
+        final TTDTestRecord expectedResult1 = new TTDTestRecord(SOURCE_TOPIC_1, testRecord1, 0L);
         assertThat(record1, equalTo(expectedResult1));
 
         pipeRecord(consumerTopic2, consumerRecord2);
@@ -1606,8 +1615,8 @@ public class TopologyTestDriverTest {
         assertEquals(1, processedRecords1.size());
         assertEquals(1, processedRecords2.size());
 
-        final Record record2 = processedRecords2.get(0);
-        final Record expectedResult2 = new Record(consumerTopic2, consumerRecord2, 0L);
+        final TTDTestRecord record2 = processedRecords2.get(0);
+        final TTDTestRecord expectedResult2 = new TTDTestRecord(consumerTopic2, consumerRecord2, 0L);
         assertThat(record2, equalTo(expectedResult2));
     }
 
@@ -1694,11 +1703,12 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                    final String value = record.value();
                     if (!value.startsWith("recurse-")) {
-                        context.forward(key, "recurse-" + value, To.child("recursiveSink"));
+                        context.forward(record.withValue("recurse-" + value), "recursiveSink");
                     }
-                    context.forward(key, value, To.child("sink"));
+                    context.forward(record, "sink");
                 }
             },
             "source"
@@ -1751,8 +1761,8 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
-                    stateStore.put(key, value);
+                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                    stateStore.put(record.key(), record.value());
                 }
             }
         );
@@ -1767,12 +1777,13 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                    final String value = record.value();
                     if (!value.startsWith("recurse-")) {
-                        context.forward(key, "recurse-" + value, To.child("recursiveSink"));
+                        context.forward(record.withValue("recurse-" + value), "recursiveSink");
                     }
-                    context.forward(key, value, To.child("sink"));
-                    context.forward(key, value, To.child("globalSink"));
+                    context.forward(record, "sink");
+                    context.forward(record, "globalSink");
                 }
             },
             "source"