You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/09/07 22:19:24 UTC

[kafka] branch trunk updated: KAFKA-13201: Convert KTable suppress to new PAPI (#11213)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f89ce5  KAFKA-13201: Convert KTable suppress to new PAPI (#11213)
5f89ce5 is described below

commit 5f89ce5f202bf560214e00fa1ac7add7a27248cf
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Tue Sep 7 23:17:44 2021 +0100

    KAFKA-13201: Convert KTable suppress to new PAPI (#11213)
    
    Migrate Suppress as part of the migration of KStream/KTable
     operations to the new Processor API (KAFKA-8410)
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../streams/kstream/internals/KTableImpl.java      |   4 +-
 .../suppress/KTableSuppressProcessorSupplier.java  |  35 ++--
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  10 +-
 .../state/internals/TimeOrderedKeyValueBuffer.java |   8 +-
 .../KTableSuppressProcessorMetricsTest.java        |  22 +--
 .../suppress/KTableSuppressProcessorTest.java      | 198 +++++++++++----------
 .../internals/TimeOrderedKeyValueBufferTest.java   |   9 +-
 .../test/MockInternalNewProcessorContext.java      | 198 +++++++++++++++++++++
 .../streams/processor/MockProcessorContext.java    |  11 +-
 9 files changed, 365 insertions(+), 130 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index e681e62..5339b6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -58,6 +58,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressi
 import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -542,8 +543,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final String storeName =
             suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
 
-        @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-        final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
+        final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
             suppressedInternal,
             storeName,
             this
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index cdad658..ef7943b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -21,10 +21,14 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableNewProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
@@ -35,8 +39,7 @@ import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 
 import static java.util.Objects.requireNonNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSupplier<K, V, V> {
+public class KTableSuppressProcessorSupplier<K, V> implements KTableNewProcessorSupplier<K, V, K, V> {
     private final SuppressedInternal<K> suppress;
     private final String storeName;
     private final KTableImpl<K, ?, V> parentKTable;
@@ -52,7 +55,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
+    public Processor<K, Change<V>, K, Change<V>> get() {
         return new KTableSuppressProcessor<>(suppress, storeName);
     }
 
@@ -109,7 +112,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
         return parentKTable.enableSendingOldValues(forceMaterialization);
     }
 
-    private static final class KTableSuppressProcessor<K, V> extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
+    private static final class KTableSuppressProcessor<K, V> extends ContextualProcessor<K, Change<V>, K, Change<V>> {
         private final long maxRecords;
         private final long maxBytes;
         private final long suppressDurationMillis;
@@ -119,7 +122,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
         private final String storeName;
 
         private TimeOrderedKeyValueBuffer<K, V> buffer;
-        private InternalProcessorContext internalProcessorContext;
+        private InternalProcessorContext<K, Change<V>> internalProcessorContext;
         private Sensor suppressionEmitSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
@@ -135,9 +138,9 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
         }
 
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<K, Change<V>> context) {
             super.init(context);
-            internalProcessorContext = (InternalProcessorContext) context;
+            internalProcessorContext = (InternalProcessorContext<K, Change<V>>) context;
             suppressionEmitSensor = ProcessorNodeMetrics.suppressionEmitSensor(
                 Thread.currentThread().getName(),
                 context.taskId().toString(),
@@ -150,16 +153,16 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
         }
 
         @Override
-        public void process(final K key, final Change<V> value) {
-            observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
-            buffer(key, value);
+        public void process(final Record<K, Change<V>> record) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+            buffer(record);
             enforceConstraints();
         }
 
-        private void buffer(final K key, final Change<V> value) {
-            final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
+        private void buffer(final Record<K, Change<V>> record) {
+            final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, record.key());
 
-            buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
+            buffer.put(bufferTime, record, internalProcessorContext.recordContext());
         }
 
         private void enforceConstraints() {
@@ -198,7 +201,9 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
                 final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
                 internalProcessorContext.setRecordContext(toEmit.recordContext());
                 try {
-                    internalProcessorContext.forward(toEmit.key(), toEmit.value());
+                    internalProcessorContext.forward(toEmit.record()
+                        .withTimestamp(toEmit.recordContext().timestamp())
+                        .withHeaders(toEmit.recordContext().headers()));
                     suppressionEmitSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
                 } finally {
                     internalProcessorContext.setRecordContext(prevRecordContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 5a653d6..ba8a745 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -467,14 +468,13 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
 
     @Override
     public void put(final long time,
-                    final K key,
-                    final Change<V> value,
+                    final Record<K, Change<V>> record,
                     final ProcessorRecordContext recordContext) {
-        requireNonNull(value, "value cannot be null");
+        requireNonNull(record.value(), "value cannot be null");
         requireNonNull(recordContext, "recordContext cannot be null");
 
-        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
-        final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, value);
+        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, record.key()));
+        final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, record.value());
 
         final BufferValue buffered = getBuffered(serializedKey);
         final byte[] serializedPriorValue;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 2466639..e2096ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -47,6 +48,10 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
             return value;
         }
 
+        public Record<K, Change<V>> record() {
+            return new Record<>(key, value, recordContext.timestamp());
+        }
+
         public ProcessorRecordContext recordContext() {
             return recordContext;
         }
@@ -70,6 +75,7 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
         public int hashCode() {
             return Objects.hash(key, value, recordContext);
         }
+
     }
 
     void setSerdesIfNull(final SerdeGetter getter);
@@ -78,7 +84,7 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
 
     Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key);
 
-    void put(long time, K key, Change<V> value, ProcessorRecordContext recordContext);
+    void put(long time, Record<K, Change<V>> record, ProcessorRecordContext recordContext);
 
     int numRecords();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 4db1342..009a70d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -29,9 +28,11 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -49,7 +50,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.core.Is.is;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KTableSuppressProcessorMetricsTest {
     private static final long ARBITRARY_LONG = 5L;
     private static final TaskId TASK_ID = new TaskId(0, 0);
@@ -134,7 +134,7 @@ public class KTableSuppressProcessorMetricsTest {
             .build();
 
         final KTableImpl<String, ?, Long> mock = EasyMock.mock(KTableImpl.class);
-        final org.apache.kafka.streams.processor.Processor<String, Change<Long>> processor =
+        final Processor<String, Change<Long>, String, Change<Long>> processor =
             new KTableSuppressProcessorSupplier<>(
                 (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
                 storeName,
@@ -142,8 +142,8 @@ public class KTableSuppressProcessorMetricsTest {
             ).get();
 
         streamsConfig.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
-        final MockInternalProcessorContext context =
-            new MockInternalProcessorContext(streamsConfig, TASK_ID, TestUtils.tempDirectory());
+        final MockInternalNewProcessorContext<String, Change<Long>> context =
+            new MockInternalNewProcessorContext<>(streamsConfig, TASK_ID, TestUtils.tempDirectory());
         final Time time = new SystemTime();
         context.setCurrentNode(new ProcessorNode("testNode"));
         context.setSystemTimeMs(time.milliseconds());
@@ -152,10 +152,11 @@ public class KTableSuppressProcessorMetricsTest {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final String key = "longKey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        processor.process(new Record<>(key, value, timestamp));
 
         final MetricName evictionRateMetric = evictionRateMetricLatest;
         final MetricName evictionTotalMetric = evictionTotalMetricLatest;
@@ -175,8 +176,9 @@ public class KTableSuppressProcessorMetricsTest {
             verifyMetric(metrics, bufferCountMaxMetric, is(1.0));
         }
 
-        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
-        processor.process("key", value);
+        context.setRecordMetadata("", 0, 1L);
+        context.setTimestamp(timestamp + 1);
+        processor.process(new Record<>("key", value, timestamp + 1));
 
         {
             final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 23b91c5..1505d0d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
@@ -29,18 +29,21 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.easymock.EasyMock;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Collection;
 
@@ -59,15 +62,14 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KTableSuppressProcessorTest {
     private static final long ARBITRARY_LONG = 5L;
 
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
     private static class Harness<K, V> {
-        private final org.apache.kafka.streams.processor.Processor<K, Change<V>> processor;
-        private final MockInternalProcessorContext context;
+        private final Processor<K, Change<V>, K, Change<V>> processor;
+        private final MockInternalNewProcessorContext<K, Change<V>> context;
 
 
         Harness(final Suppressed<K> suppressed,
@@ -81,10 +83,10 @@ public class KTableSuppressProcessorTest {
                 .build();
 
             final KTableImpl<K, ?, V> parent = EasyMock.mock(KTableImpl.class);
-            final org.apache.kafka.streams.processor.Processor<K, Change<V>> processor =
+            final Processor<K, Change<V>, K, Change<V>> processor =
                 new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, storeName, parent).get();
 
-            final MockInternalProcessorContext context = new MockInternalProcessorContext();
+            final MockInternalNewProcessorContext<K, Change<V>> context = new MockInternalNewProcessorContext<>();
             context.setCurrentNode(new ProcessorNode("testNode"));
 
             buffer.init((StateStoreContext) context, buffer);
@@ -99,73 +101,75 @@ public class KTableSuppressProcessorTest {
     public void zeroTimeLimitShouldImmediatelyEmit() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = ARBITRARY_LONG;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ZERO, unbounded()), timeWindowedSerdeFrom(String.class, 100L), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long timestamp = ARBITRARY_LONG;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void intermediateSuppressionShouldBufferAndEmitLater() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = 0L;
-        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("topic", 0, 0);
+        context.setTimestamp(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
         assertThat(context.forwarded(), hasSize(0));
 
-        context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), 1L);
-        harness.processor.process("tick", new Change<>(null, null));
+        context.setRecordMetadata("topic", 0, 1);
+        context.setTimestamp(1L);
+        harness.processor.process(new Record<>("tick", new Change<>(null, null), 1L));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(1L)), timeWindowedSerdeFrom(String.class, 1L), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long windowStart = 99L;
         final long recordTime = 99L;
         final long windowEnd = 100L;
-        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), recordTime);
+        context.setRecordMetadata("topic", 0, 0);
+        context.setTimestamp(recordTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, recordTime));
         assertThat(context.forwarded(), hasSize(0));
 
         // although the stream time is now 100, we have to wait 1 ms after the window *end* before we
@@ -173,21 +177,22 @@ public class KTableSuppressProcessorTest {
         final long windowStart2 = 100L;
         final long recordTime2 = 100L;
         final long windowEnd2 = 101L;
-        context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), recordTime2);
-        harness.processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
+        context.setRecordMetadata("topic", 0, 1);
+        context.setTimestamp(recordTime2);
+        harness.processor.process(new Record<>(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE, recordTime2));
         assertThat(context.forwarded(), hasSize(0));
 
         // ok, now it's time to emit "hey"
         final long windowStart3 = 101L;
         final long recordTime3 = 101L;
         final long windowEnd3 = 102L;
-        context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), recordTime3);
-        harness.processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
+        context.setRecordMetadata("topic", 0, 1);
+        context.setTimestamp(recordTime3);
+        harness.processor.process(new Record<>(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE, recordTime3));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(recordTime));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, recordTime)));
     }
 
     /**
@@ -199,43 +204,44 @@ public class KTableSuppressProcessorTest {
     public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         // note the record is in the past, but the window end is in the future, so we still have to buffer,
         // even though the grace period is 0.
         final long timestamp = 5L;
         final long windowEnd = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
         assertThat(context.forwarded(), hasSize(0));
 
-        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), windowEnd);
-        harness.processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
+        context.setRecordMetadata("", 0, 1L);
+        context.setTimestamp(windowEnd);
+        harness.processor.process(new Record<>(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE, windowEnd));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     /**
@@ -246,13 +252,14 @@ public class KTableSuppressProcessorTest {
     public void finalResultsShouldDropTombstonesForTimeWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(0));
     }
@@ -266,13 +273,14 @@ public class KTableSuppressProcessorTest {
     public void finalResultsShouldDropTombstonesForSessionWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(0));
     }
@@ -285,18 +293,20 @@ public class KTableSuppressProcessorTest {
     public void suppressShouldNotDropTombstonesForTimeWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        final Headers headers = new RecordHeaders().add("k", "v".getBytes(StandardCharsets.UTF_8));
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
+        context.setHeaders(headers);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp, headers)));
     }
 
 
@@ -308,18 +318,18 @@ public class KTableSuppressProcessorTest {
     public void suppressShouldNotDropTombstonesForSessionWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<Windowed<String>, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
 
@@ -331,78 +341,82 @@ public class KTableSuppressProcessorTest {
     public void suppressShouldNotDropTombstonesForKTable() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void suppressShouldEmitWhenOverRecordCapacity() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
-        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
-        harness.processor.process("dummyKey", value);
+        context.setRecordMetadata("", 0, 1L);
+        context.setTimestamp(timestamp + 1);
+        harness.processor.process(new Record<>("dummyKey", value, timestamp + 1));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void suppressShouldEmitWhenOverByteCapacity() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
-        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
-        harness.processor.process("dummyKey", value);
+        context.setRecordMetadata("", 0, 1L);
+        context.setTimestamp(timestamp + 1);
+        harness.processor.process(new Record<>("dummyKey", value, timestamp + 1));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
-        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.record(), is(new Record<>(key, value, timestamp)));
     }
 
     @Test
     public void suppressShouldShutDownWhenOverRecordCapacity() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull()), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
-        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 1L);
+        context.setTimestamp(timestamp);
         try {
-            harness.processor.process("dummyKey", value);
+            harness.processor.process(new Record<>("dummyKey", value, timestamp));
             fail("expected an exception");
         } catch (final StreamsException e) {
             assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
@@ -413,18 +427,20 @@ public class KTableSuppressProcessorTest {
     public void suppressShouldShutDownWhenOverByteCapacity() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull()), String(), Long());
-        final MockInternalProcessorContext context = harness.context;
+        final MockInternalNewProcessorContext<String, Change<Long>> context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 0L);
+        context.setTimestamp(timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        harness.processor.process(key, value);
+        harness.processor.process(new Record<>(key, value, timestamp));
 
-        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
+        context.setRecordMetadata("", 0, 1L);
+        context.setTimestamp(1L);
         try {
-            harness.processor.process("dummyKey", value);
+            harness.processor.process(new Record<>("dummyKey", value, timestamp));
             fail("expected an exception");
         } catch (final StreamsException e) {
             assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 26e9488..4c74df2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -144,7 +145,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final MockInternalProcessorContext context = makeContext();
         buffer.init((StateStoreContext) context, buffer);
         try {
-            buffer.put(0, "asdf", null, getContext(0));
+            buffer.put(0, new Record<>("asdf", null, 0L), getContext(0));
             fail("expected an exception");
         } catch (final NullPointerException expected) {
             // expected
@@ -284,8 +285,8 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final ProcessorRecordContext recordContext = getContext(0L);
         context.setRecordContext(recordContext);
-        buffer.put(1L, "A", new Change<>("new-value", "old-value"), recordContext);
-        buffer.put(1L, "B", new Change<>("new-value", null), recordContext);
+        buffer.put(1L, new Record<>("A", new Change<>("new-value", "old-value"), 0L), recordContext);
+        buffer.put(1L, new Record<>("B", new Change<>("new-value", null), 0L), recordContext);
         assertThat(buffer.priorValueForBuffered("A"), is(Maybe.defined(ValueAndTimestamp.make("old-value", -1))));
         assertThat(buffer.priorValueForBuffered("B"), is(Maybe.defined(null)));
     }
@@ -1012,7 +1013,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                   final String value) {
         final ProcessorRecordContext recordContext = getContext(recordTimestamp);
         context.setRecordContext(recordContext);
-        buffer.put(streamTime, key, new Change<>(value, null), recordContext);
+        buffer.put(streamTime, new Record<>(key, new Change<>(value, null), 0L), recordContext);
     }
 
     private static BufferValue getBufferValue(final String value, final long timestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
new file mode 100644
index 0000000..ffb503e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
+
+import java.io.File;
+import java.util.Properties;
+
+public class MockInternalNewProcessorContext<KOut, VOut> extends MockProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, VOut> {
+
+    private ProcessorNode currentNode;
+    private long currentSystemTimeMs;
+    private TaskType taskType = TaskType.ACTIVE;
+
+    private long timestamp = 0;
+    private Headers headers = new RecordHeaders();
+
+    public MockInternalNewProcessorContext() {
+    }
+
+    public MockInternalNewProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        super(config, taskId, stateDir);
+    }
+
+    @Override
+    public void setSystemTimeMs(long timeMs) {
+        currentSystemTimeMs = timeMs;
+    }
+
+    @Override
+    public long currentSystemTimeMs() {
+        return currentSystemTimeMs;
+    }
+
+    @Override
+    public long currentStreamTimeMs() {
+        return 0;
+    }
+
+    @Override
+    public StreamsMetricsImpl metrics() {
+        return (StreamsMetricsImpl) super.metrics();
+    }
+
+    @Override
+    public ProcessorRecordContext recordContext() {
+        return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
+    }
+
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        setRecordMetadata(
+            recordContext.topic(),
+            recordContext.partition(),
+            recordContext.offset()
+        );
+        this.headers = recordContext.headers();
+        this.timestamp = recordContext.timestamp();
+    }
+
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public void setHeaders(final Headers headers) {
+        this.headers = headers;
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        this.currentNode = currentNode;
+    }
+
+    @Override
+    public ProcessorNode currentNode() {
+        return currentNode;
+    }
+
+    @Override
+    public ThreadCache cache() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {}
+
+    @Override
+    public void uninitialize() {}
+
+    @Override
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+        addStateStore(store);
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value) {
+        throw new UnsupportedOperationException("Migrate to new implementation");
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value, To to) {
+        throw new UnsupportedOperationException("Migrate to new implementation");
+    }
+
+    @Override
+    public String topic() {
+        if (recordMetadata().isPresent()) return recordMetadata().get().topic();
+        else return null;
+    }
+
+    @Override
+    public int partition() {
+        if (recordMetadata().isPresent()) return recordMetadata().get().partition();
+        else return 0;
+    }
+
+    @Override
+    public long offset() {
+        if (recordMetadata().isPresent()) return recordMetadata().get().offset();
+        else return 0;
+    }
+
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public TaskType taskType() {
+        return taskType;
+    }
+
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+    }
+
+    @Override
+    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
+    }
+
+    @Override
+    public void transitionToStandby(final ThreadCache newCache) {
+    }
+
+    @Override
+    public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) {
+    }
+
+    @Override
+    public <T extends StateStore> T getStateStore(StoreBuilder<T> builder) {
+        return getStateStore(builder.name());
+    }
+
+    @Override
+    public String changelogFor(final String storeName) {
+        return "mock-changelog";
+    }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 0c81e3e..061c1fb 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -125,9 +125,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     public static class CapturedForward {
         private final String childName;
         private final long timestamp;
+        private final Headers headers;
         private final KeyValue keyValue;
 
-        private CapturedForward(final To to, final KeyValue keyValue) {
+        private CapturedForward(final KeyValue keyValue, final To to, final Headers headers) {
             if (keyValue == null) {
                 throw new IllegalArgumentException();
             }
@@ -135,6 +136,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
             this.childName = to.childName;
             this.timestamp = to.timestamp;
             this.keyValue = keyValue;
+            this.headers = headers;
         }
 
         /**
@@ -175,6 +177,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
                 ", keyValue=" + keyValue +
                 '}';
         }
+
+        public Headers headers() {
+            return this.headers;
+        }
     }
 
     // constructors ================================================
@@ -498,8 +504,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     public <K, V> void forward(final K key, final V value, final To to) {
         capturedForwards.add(
             new CapturedForward(
+                new KeyValue<>(key, value),
                 to.timestamp == -1 ? to.withTimestamp(recordTimestamp == null ? -1 : recordTimestamp) : to,
-                new KeyValue<>(key, value)
+                headers
             )
         );
     }