You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/01 17:06:19 UTC

[kafka] 02/02: move metadata to context

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-4-record-processor
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d3321aa0d199c80b04ab9e1121b4501b333db7a5
Author: John Roesler <vv...@apache.org>
AuthorDate: Thu Oct 1 12:06:18 2020 -0500

    move metadata to context
---
 .../examples/docs/DeveloperGuideTesting.java       | 10 ++++-----
 .../kafka/streams/processor/api/Processor.java     |  9 +-------
 .../streams/processor/api/ProcessorContext.java    | 10 +++++++++
 .../internals/AbstractProcessorContext.java        |  7 ++++++
 .../internals/GlobalProcessorContextImpl.java      |  5 +----
 .../processor/internals/GlobalStateUpdateTask.java |  6 +----
 .../processor/internals/ProcessorAdapter.java      |  5 +----
 .../processor/internals/ProcessorContextImpl.java  |  6 +----
 .../streams/processor/internals/ProcessorNode.java |  8 +++----
 .../streams/processor/internals/SinkNode.java      | 26 ++++++----------------
 .../streams/processor/internals/SourceNode.java    |  5 +----
 .../streams/processor/internals/StreamTask.java    |  5 +----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  6 ++---
 .../apache/kafka/streams/StreamsBuilderTest.java   |  4 +---
 .../org/apache/kafka/streams/TopologyTest.java     |  4 +---
 .../internals/GlobalProcessorContextImplTest.java  |  2 +-
 .../processor/internals/ProcessorNodeTest.java     |  6 ++---
 .../processor/internals/ProcessorTopologyTest.java | 14 +++++-------
 .../streams/processor/internals/SinkNodeTest.java  |  2 +-
 .../processor/internals/StreamTaskTest.java        |  6 ++---
 .../processor/internals/StreamThreadTest.java      |  4 +---
 .../kafka/test/InternalMockProcessorContext.java   |  5 ++---
 .../org/apache/kafka/test/MockApiProcessor.java    |  4 +---
 .../kafka/test/MockInternalProcessorContext.java   |  7 ++++++
 .../java/org/apache/kafka/test/MockProcessor.java  |  7 +-----
 .../org/apache/kafka/test/MockProcessorNode.java   |  6 ++---
 .../java/org/apache/kafka/test/MockSourceNode.java |  4 +---
 .../kafka/streams/TopologyTestDriverTest.java      | 19 ++++++++--------
 28 files changed, 78 insertions(+), 124 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index 1ce3445..72e704e 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -21,26 +21,24 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -170,7 +168,7 @@ public class DeveloperGuideTesting {
         }
 
         @Override
-        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, Long> record) {
             final Long oldValue = store.get(record.key());
             if (oldValue == null || record.value() > oldValue) {
                 store.put(record.key(), record.value());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
index e6feccb..167976b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.time.Duration;
-import java.util.Optional;
 
 /**
  * A processor of key-value pair records.
@@ -50,14 +49,8 @@ public interface Processor<KIn, VIn, KOut, VOut> {
      * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
      * @param record the record to process
-     * @param recordMetadata the metadata of the record, if it is defined. Note that as long as the processor is
-     *                       receiving a record downstream of a Source (i.e., the current record is coming from an
-     *                       input topic), the metadata is defined. On the other hand, if a parent processor has
-     *                       registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
-     *                       punctuator, then there is no record from an input topic, and therefore the metadata
-     *                       would be undefined.
      */
-    void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);
+    void process(Record<KIn, VIn> record);
 
     /**
      * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index f4f0fdb..a431993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -53,6 +53,16 @@ public interface ProcessorContext<KForward, VForward> {
     TaskId taskId();
 
     /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is
+     * receiving a record downstream of a Source (i.e., the current record is coming from an
+     * input topic), the metadata is defined. On the other hand, if a parent processor has
+     * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+     * punctuator, then there is no record from an input topic, and therefore the metadata
+     * would be undefined.
+     */
+    Optional<RecordMetadata> recordMetadata();
+
+    /**
      * Returns the default key serde.
      *
      * @return the key serializer
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index ef222e0..c29614a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -31,6 +32,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
@@ -201,6 +203,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordContext);
+    }
+
+    @Override
     public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
         this.currentNode = currentNode;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 9f31309..3468833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,13 +25,11 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.time.Duration;
-import java.util.Optional;
 
 import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
@@ -63,11 +61,10 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     @Override
     public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
-        final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<K, V, ?, ?>) child).process(record, recordMetadata);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record);
             }
         } finally {
             setCurrentNode(previousNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 360e50e..6b1378b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
@@ -112,10 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(
-                toProcess,
-                Optional.of(recordContext)
-            );
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index 291a99e..84c8602 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -20,9 +20,6 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-
-import java.util.Optional;
 
 public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
     private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
@@ -57,7 +54,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         final ProcessorRecordContext processorRecordContext = context.recordContext();
         try {
             context.setRecordContext(new ProcessorRecordContext(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 309b813..d164428 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -38,7 +37,6 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
@@ -263,9 +261,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                                         final Record<K, V> record) {
         setCurrentNode(child);
 
-        final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext);
-
-        child.process(record, recordMetadata);
+        child.process(record);
 
         if (child.isTerminalNode()) {
             streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 2939525..38daa52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -20,11 +20,10 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
@@ -32,7 +31,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -176,11 +174,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
     }
 
 
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         throwIfClosed();
 
         try {
-            maybeMeasureLatency(() -> processor.process(record, recordMetadata), time, processSensor);
+            maybeMeasureLatency(() -> processor.process(record), time, processSensor);
         } catch (final ClassCastException e) {
             final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
             final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index f8840e4..813bcb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -22,9 +22,6 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-
-import java.util.Optional;
 
 public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
@@ -81,7 +78,7 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
         final KIn key = record.key();
@@ -96,22 +93,13 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
             );
         }
 
-        // Prefer the record metadata if defined,
-        // and fall back to the context (which is undefined and dummy values,
-        // but extractors may still depend on the current behavior.
-        final Optional<ProcessorRecordContext> maybeContext =
-            recordMetadata.map(
-                m -> new ProcessorRecordContext(timestamp, m.offset(), m.partition(), m.topic(), record.headers())
-            );
         final ProcessorRecordContext contextForExtraction =
-            maybeContext.orElseGet(
-                () -> new ProcessorRecordContext(
-                    timestamp,
-                    context.offset(),
-                    context.partition(),
-                    context.topic(),
-                    record.headers()
-                )
+            new ProcessorRecordContext(
+                timestamp,
+                context.offset(),
+                context.partition(),
+                context.topic(),
+                record.headers()
             );
 
         final String topic = topicExtractor.extract(key, value, contextForExtraction);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 711b4c3..7fa8c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -22,11 +22,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
-import java.util.Optional;
-
 public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private InternalProcessorContext context;
@@ -96,7 +93,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
 
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         context.forward(record);
         processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c4e4ff3..464ac9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@@ -56,7 +55,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -692,8 +690,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
-            maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
+            maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
 
             log.trace("Completed processing one record [{}]", record);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 7ff2c6c..38baeb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -78,7 +77,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Executors;
@@ -869,7 +867,7 @@ public class KafkaStreamsTest {
                     }
 
                     @Override
-                    public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                    public void process(final Record<String, String> record) {
                         if (record.value().length() % 2 == 0) {
                             context.forward(record.withValue(record.key() + record.value()));
                         }
@@ -970,7 +968,7 @@ public class KafkaStreamsTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
                     kvStore.put(record.key(), 5L);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index b308b4f..415aaea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -60,7 +59,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 
 import static java.util.Arrays.asList;
@@ -109,7 +107,7 @@ public class StreamsBuilderTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     store.put(record.key(), record.value());
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 9e9f415..ef9becf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -46,7 +45,6 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -379,7 +377,7 @@ public class TopologyTest {
                 }
 
                 @Override
-                public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { }
+                public void process(final Record<Object, Object> record) { }
             };
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 6322fd2..a83c92b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -100,7 +100,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldForwardToSingleChild() {
-        child.process(anyObject(), anyObject());
+        child.process(anyObject());
         expectLastCall();
 
         expect(recordContext.timestamp()).andStubReturn(0L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a4efcbc..8ff9451 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.Optional;
-import java.util.Properties;
-
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -40,6 +37,7 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -208,7 +206,7 @@ public class ProcessorNodeTest {
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
-            () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()), Optional.ofNullable(context.recordContext()))
+            () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()))
         );
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
         assertThat(se.getMessage(), containsString("default Serdes"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 0b7c1b3..07ad044 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -55,7 +54,6 @@ import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.function.Supplier;
@@ -777,7 +775,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record);
         }
     }
@@ -794,7 +792,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record.withTimestamp(record.timestamp() + 10));
         }
     }
@@ -816,7 +814,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record);
             context.forward(record.withTimestamp(record.timestamp() + 5), firstChild);
             context.forward(record, secondChild);
@@ -833,7 +831,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record.withHeaders(record.headers().add(HEADER)));
         }
     }
@@ -851,7 +849,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             context.forward(record.withValue(record.value().split("@")[0]));
         }
     }
@@ -936,7 +934,7 @@ public class ProcessorTopologyTest {
         }
 
         @Override
-        public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, String> record) {
             store.put(record.key(), record.value());
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index bc6f08b..c877ab5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -52,7 +52,7 @@ public class SinkNodeTest {
         // When/Then
         context.setTime(-1); // ensures a negative timestamp is set for the record we send next
         try {
-            illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()), java.util.Optional.empty());
+            illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()));
             fail("Should have thrown StreamsException");
         } catch (final StreamsException ignored) {
             // expected
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b54aa6c..e620be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -80,7 +79,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -135,7 +133,7 @@ public class StreamTaskTest {
     private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
         @Override
-        public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<Integer, Integer> record) {
             throw new RuntimeException("KABOOM!");
         }
 
@@ -471,7 +469,7 @@ public class StreamTaskTest {
             }
 
             @Override
-            public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) {
+            public void process(final Record<Integer, Integer> record) {
                 if (record.key() % 2 == 0) {
                     context.forward(record);
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6520778..4ee44ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -64,7 +64,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -94,7 +93,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -1205,7 +1203,7 @@ public class StreamThreadTest {
             "proc",
             () -> new Processor<Object, Object, Object, Object>() {
                 @Override
-                public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<Object, Object> record) {
                     if (shouldThrow.get()) {
                         throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition))));
                     } else {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index bd82e29..6086f97 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -55,7 +55,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
@@ -300,7 +299,7 @@ public class InternalMockProcessorContext
         try {
             for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
                 currentNode = childNode;
-                ((ProcessorNode<K, V, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+                ((ProcessorNode<K, V, ?, ?>) childNode).process(record);
             }
         } finally {
             currentNode = thisNode;
@@ -337,7 +336,7 @@ public class InternalMockProcessorContext
                 if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
                     currentNode = childNode;
                     final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers());
-                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext));
+                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record);
                     toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
                                            // Processors and toInternal might have been modified
                 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
index 8bed338..dd56bad 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.time.Duration;
@@ -30,7 +29,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -74,7 +72,7 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         final KIn key = record.key();
         final VIn value = record.value();
         final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 82b24d1..370dca7 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
@@ -84,6 +86,11 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     }
 
     @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.of(recordContext());
+    }
+
+    @Override
     public void setRecordContext(final ProcessorRecordContext recordContext) {
         setRecordMetadata(
             recordContext.topic(),
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 6c653c3..f18b763 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -22,17 +22,14 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     private final MockApiProcessor<K, V, Object, Object> delegate;
-    private InternalProcessorContext internalProcessorContext;
 
     public MockProcessor(final PunctuationType punctuationType,
                          final long scheduleInterval) {
@@ -47,14 +44,12 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     @Override
     public void init(final ProcessorContext context) {
         super.init(context);
-        internalProcessorContext = (InternalProcessorContext) context;
         delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context);
     }
 
     @Override
     public void process(final K key, final V value) {
-        final Record<K, V> record = new Record<>(key, value, context.timestamp(), context.headers());
-        delegate.process(record, Optional.ofNullable(internalProcessorContext.recordContext()));
+        delegate.process(new Record<>(key, value, context.timestamp(), context.headers()));
     }
 
     public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 90fd905..a75c250 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -18,12 +18,10 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Collections;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
@@ -61,8 +59,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
-        processor().process(record, recordMetadata);
+    public void process(final Record<KIn, VIn> record) {
+        processor().process(record);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 4c3fed1..9d22e3b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -18,12 +18,10 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
@@ -42,7 +40,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     }
 
     @Override
-    public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) {
+    public void process(final Record<KIn, VIn> record) {
         numReceived++;
         keys.add(record.key());
         values.add(record.value());
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 48783a6..fd9fb76 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -72,7 +72,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -266,14 +265,14 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<Object, Object> record) {
             processedRecords.add(new TTDTestRecord(
                 record.key(),
                 record.value(),
                 record.headers(),
                 record.timestamp(),
-                recordMetadata.map(RecordMetadata::offset).orElse(-1L),
-                recordMetadata.map(RecordMetadata::topic).orElse(null)
+                context.recordMetadata().map(RecordMetadata::offset).orElse(-1L),
+                context.recordMetadata().map(RecordMetadata::topic).orElse(null)
             ));
             context.forward(record);
         }
@@ -408,7 +407,7 @@ public class TopologyTestDriverTest {
                     }
 
                     @Override
-                    public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) {
+                    public void process(final Record<Object, Object> record) {
                         store.put(record.key(), record.value());
                     }
                 }
@@ -1461,7 +1460,7 @@ public class TopologyTestDriverTest {
         }
 
         @Override
-        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+        public void process(final Record<String, Long> record) {
             final Long oldValue = store.get(record.key());
             if (oldValue == null || record.value() > oldValue) {
                 store.put(record.key(), record.value());
@@ -1514,7 +1513,7 @@ public class TopologyTestDriverTest {
                         }
 
                         @Override
-                        public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) {
+                        public void process(final Record<String, Long> record) {
                             store.put(record.key(), record.value());
                         }
                     };
@@ -1703,7 +1702,7 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     final String value = record.value();
                     if (!value.startsWith("recurse-")) {
                         context.forward(record.withValue("recurse-" + value), "recursiveSink");
@@ -1761,7 +1760,7 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     stateStore.put(record.key(), record.value());
                 }
             }
@@ -1777,7 +1776,7 @@ public class TopologyTestDriverTest {
                 }
 
                 @Override
-                public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) {
+                public void process(final Record<String, String> record) {
                     final String value = record.value();
                     if (!value.startsWith("recurse-")) {
                         context.forward(record.withValue("recurse-" + value), "recursiveSink");