You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/02/12 04:26:51 UTC

[kafka] branch trunk updated: KAFKA-12939: After migrating processors, search the codebase for missed migrations (#11534)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9931036  KAFKA-12939: After migrating processors, search the codebase for missed migrations (#11534)
9931036 is described below

commit 99310360a589ccb28f7636b714e98e2e7c5a5110
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Sat Feb 12 04:25:03 2022 +0000

    KAFKA-12939: After migrating processors, search the codebase for missed migrations (#11534)
    
    Migrated internal usages that had previously been marked with TODO suppressions.
    
    Reviewer: John Roesler<vv...@apache.org>
---
 .../kstream/internals/KStreamFlatTransform.java    | 23 ++++--
 .../internals/KStreamFlatTransformValues.java      | 23 ++++--
 .../kstream/internals/KStreamTransformValues.java  | 29 ++++---
 .../processor/internals/RecordDeserializer.java    |  9 +-
 .../streams/processor/internals/RecordQueue.java   |  9 +-
 .../kafka/streams/state/ValueAndTimestamp.java     |  4 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   | 32 +++++---
 .../integration/GlobalKTableIntegrationTest.java   |  7 +-
 .../integration/GlobalThreadShutDownOrderTest.java | 16 ++--
 .../integration/RangeQueryIntegrationTest.java     | 55 ++++++-------
 .../integration/RestoreIntegrationTest.java        | 25 ++----
 .../integration/StoreUpgradeIntegrationTest.java   | 73 +++++++----------
 .../integration/TaskMetadataIntegrationTest.java   | 10 +--
 .../kstream/RepartitionTopicNamingTest.java        | 81 +++++++++++-------
 .../kstream/internals/AbstractStreamTest.java      | 20 +++--
 .../kstream/internals/GlobalKTableJoinsTest.java   | 11 +--
 .../kstream/internals/KGroupedStreamImplTest.java  | 92 ++++++++++-----------
 .../kstream/internals/KStreamFilterTest.java       |  9 +-
 .../kstream/internals/KStreamFlatMapTest.java      |  7 +-
 .../internals/KStreamFlatMapValuesTest.java        |  8 +-
 .../internals/KStreamFlatTransformTest.java        | 21 ++---
 .../internals/KStreamFlatTransformValuesTest.java  | 21 ++---
 .../internals/KStreamGlobalKTableJoinTest.java     |  9 +-
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  9 +-
 .../streams/kstream/internals/KStreamImplTest.java |  9 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  | 60 +++++++-------
 .../internals/KStreamKStreamLeftJoinTest.java      | 56 ++++++-------
 .../internals/KStreamKStreamOuterJoinTest.java     | 51 ++++++------
 .../kstream/internals/KStreamKTableJoinTest.java   |  9 +-
 .../internals/KStreamKTableLeftJoinTest.java       |  9 +-
 .../streams/kstream/internals/KStreamMapTest.java  |  5 +-
 .../kstream/internals/KStreamMapValuesTest.java    |  6 +-
 .../kstream/internals/KStreamSelectKeyTest.java    |  6 +-
 .../KStreamSlidingWindowAggregateTest.java         | 62 ++++++--------
 .../internals/KStreamTransformValuesTest.java      | 12 +--
 .../internals/KStreamWindowAggregateTest.java      | 25 +++---
 .../kstream/internals/KTableFilterTest.java        |  7 +-
 .../streams/kstream/internals/KTableImplTest.java  | 14 ++--
 .../kstream/internals/KTableMapKeysTest.java       |  5 +-
 .../kstream/internals/KTableMapValuesTest.java     |  9 +-
 .../kstream/internals/KTableSourceTest.java        |  4 +-
 .../internals/SessionWindowedKStreamImplTest.java  | 11 ++-
 .../internals/SlidingWindowedKStreamImplTest.java  | 11 ++-
 .../internals/TimeWindowedKStreamImplTest.java     | 11 ++-
 .../internals/graph/GraphGraceSearchUtilTest.java  | 29 +++----
 .../internals/graph/TableProcessorNodeTest.java    | 10 ++-
 .../processor/internals/ProcessorNodeTest.java     | 52 +++++-------
 .../processor/internals/PunctuationQueueTest.java  | 48 ++---------
 .../internals/RepartitionOptimizingTest.java       | 12 ++-
 .../processor/internals/StreamThreadTest.java      | 23 +++---
 .../apache/kafka/streams/tests/EosTestClient.java  |  1 -
 .../kafka/streams/tests/SmokeTestClient.java       | 12 ++-
 .../apache/kafka/streams/tests/SmokeTestUtil.java  | 95 +++++++++++-----------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  1 -
 54 files changed, 592 insertions(+), 676 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
index 4d4fd2b..dc66cf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
@@ -19,12 +19,17 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Set;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {
+public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
 
     private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
 
@@ -33,7 +38,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.ka
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
+    public Processor<KIn, VIn, KOut, VOut> get() {
         return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
     }
 
@@ -42,7 +47,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.ka
         return transformerSupplier.stores();
     }
 
-    public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {
+    public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends ContextualProcessor<KIn, VIn, KOut, VOut> {
 
         private final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;
 
@@ -51,17 +56,17 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.ka
         }
 
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<KOut, VOut> context) {
             super.init(context);
-            transformer.init(context);
+            transformer.init((InternalProcessorContext<KOut, VOut>) context);
         }
 
         @Override
-        public void process(final KIn key, final VIn value) {
-            final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(key, value);
+        public void process(final Record<KIn, VIn> record) {
+            final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(record.key(), record.value());
             if (pairs != null) {
                 for (final KeyValue<KOut, VOut> pair : pairs) {
-                    context().forward(pair.key, pair.value);
+                    context().forward(record.withKey(pair.key).withValue(pair.value));
                 }
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
index baf44c7..5469c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
@@ -18,13 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Set;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {
+public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
 
     private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;
 
@@ -33,7 +38,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.ka
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
+    public Processor<KIn, VIn, KIn, VOut> get() {
         return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
     }
 
@@ -42,7 +47,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.ka
         return valueTransformerSupplier.stores();
     }
 
-    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {
+    public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
 
         private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
 
@@ -51,17 +56,17 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.ka
         }
 
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<KIn, VOut> context) {
             super.init(context);
-            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
+            valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>) context));
         }
 
         @Override
-        public void process(final KIn key, final VIn value) {
-            final Iterable<VOut> transformedValues = valueTransformer.transform(key, value);
+        public void process(final Record<KIn, VIn> record) {
+            final Iterable<VOut> transformedValues = valueTransformer.transform(record.key(), record.value());
             if (transformedValues != null) {
                 for (final VOut transformedValue : transformedValues) {
-                    context.forward(key, transformedValue);
+                    context().forward(record.withValue(transformedValue));
                 }
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 468bf8d..1b767ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -18,22 +18,27 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Set;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KStreamTransformValues<K, V, R> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+public class KStreamTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
 
-    private final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
+    private final ValueTransformerWithKeySupplier<KIn, VIn, VOut> valueTransformerSupplier;
 
-    KStreamTransformValues(final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier) {
+    KStreamTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, VOut> valueTransformerSupplier) {
         this.valueTransformerSupplier = valueTransformerSupplier;
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, V> get() {
+    public Processor<KIn, VIn, KIn, VOut> get() {
         return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
     }
 
@@ -42,23 +47,23 @@ public class KStreamTransformValues<K, V, R> implements org.apache.kafka.streams
         return valueTransformerSupplier.stores();
     }
 
-    public static class KStreamTransformValuesProcessor<K, V, R> extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+    public static class KStreamTransformValuesProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
 
-        private final ValueTransformerWithKey<K, V, R> valueTransformer;
+        private final ValueTransformerWithKey<KIn, VIn, VOut> valueTransformer;
 
-        KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) {
+        KStreamTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, VOut> valueTransformer) {
             this.valueTransformer = valueTransformer;
         }
 
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<KIn, VOut> context) {
             super.init(context);
-            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
+            valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>) context));
         }
 
         @Override
-        public void process(final K key, final V value) {
-            context.forward(key, valueTransformer.transform(key, value));
+        public void process(final Record<KIn, VIn> record) {
+            context().forward(record.withValue(valueTransformer.transform(record.key(), record.value())));
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index b5c821a..6ffe955 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.slf4j.Logger;
 
 import java.util.Optional;
@@ -50,7 +50,7 @@ class RecordDeserializer {
      *                          {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL}
      *                          or throws an exception itself
      */
-    ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
+    ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processorContext,
                                                final ConsumerRecord<byte[], byte[]> rawRecord) {
 
         try {
@@ -70,7 +70,10 @@ class RecordDeserializer {
         } catch (final Exception deserializationException) {
             final DeserializationExceptionHandler.DeserializationHandlerResponse response;
             try {
-                response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException);
+                response = deserializationExceptionHandler.handle(
+                    (InternalProcessorContext<?, ?>) processorContext,
+                    rawRecord,
+                    deserializationException);
             } catch (final Exception fatalUserException) {
                 log.error(
                     "Deserialization error callback failed after deserialization error for record {}",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index db27f0b..1c01966 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
 
@@ -41,7 +41,7 @@ public class RecordQueue {
     private final Logger log;
     private final SourceNode<?, ?> source;
     private final TopicPartition partition;
-    private final ProcessorContext processorContext;
+    private final ProcessorContext<?, ?> processorContext;
     private final TimestampExtractor timestampExtractor;
     private final RecordDeserializer recordDeserializer;
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
@@ -55,7 +55,7 @@ public class RecordQueue {
                 final SourceNode<?, ?> source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler deserializationExceptionHandler,
-                final InternalProcessorContext processorContext,
+                final InternalProcessorContext<?, ?> processorContext,
                 final LogContext logContext) {
         this.source = source;
         this.partition = partition;
@@ -175,7 +175,8 @@ public class RecordQueue {
 
         while (headRecord == null && !fifoQueue.isEmpty()) {
             final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
-            final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext, raw);
+            final ConsumerRecord<Object, Object> deserialized =
+                recordDeserializer.deserialize(processorContext, raw);
 
             if (deserialized == null) {
                 // this only happens if the deserializer decides to skip. It has already logged the reason.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index f5fc7a2..1cf7277 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -37,12 +37,12 @@ public final class ValueAndTimestamp<V> {
     }
 
     /**
-     * Create a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null}.
+     * Create a new {@link ValueAndTimestamp} instance if the provided {@code value} is not {@code null}.
      *
      * @param value      the value
      * @param timestamp  the timestamp
      * @param <V> the type of the value
-     * @return a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null};
+     * @return a new {@link ValueAndTimestamp} instance if the provided {@code value} is not {@code null};
      *         otherwise {@code null} is returned
      */
     public static <V> ValueAndTimestamp<V> make(final V value,
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0113e56..06854db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -45,11 +45,10 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockPredicate;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.NoopValueTransformer;
 import org.apache.kafka.test.NoopValueTransformerWithKey;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -77,7 +76,6 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class StreamsBuilderTest {
 
     private static final String STREAM_TOPIC = "stream-topic";
@@ -301,7 +299,7 @@ public class StreamsBuilderTest {
         final KStream<String, String> source = builder.stream("topic-source");
         source.to("topic-sink");
 
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
         source.process(processorSupplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -321,10 +319,10 @@ public class StreamsBuilderTest {
         final KStream<String, String> source = builder.stream("topic-source");
         final KStream<String, String> through = source.through("topic-sink");
 
-        final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> sourceProcessorSupplier = new MockApiProcessorSupplier<>();
         source.process(sourceProcessorSupplier);
 
-        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> throughProcessorSupplier = new MockApiProcessorSupplier<>();
         through.process(throughProcessorSupplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -342,10 +340,10 @@ public class StreamsBuilderTest {
         final KStream<String, String> source = builder.stream("topic-source");
         final KStream<String, String> through = source.repartition();
 
-        final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> sourceProcessorSupplier = new MockApiProcessorSupplier<>();
         source.process(sourceProcessorSupplier);
 
-        final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> throughProcessorSupplier = new MockApiProcessorSupplier<>();
         through.process(throughProcessorSupplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -367,7 +365,7 @@ public class StreamsBuilderTest {
         final KStream<String, String> source2 = builder.stream(topic2);
         final KStream<String, String> merged = source1.merge(source2);
 
-        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
         merged.process(processorSupplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -707,6 +705,7 @@ public class StreamsBuilderTest {
                                 STREAM_OPERATION_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAddThirdStateStoreIfStreamStreamJoinFixIsDisabledViaOldApi() {
         final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
@@ -798,12 +797,16 @@ public class StreamsBuilderTest {
                                 STREAM_OPERATION_NAME + "-merge");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKStream() {
         final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
         final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
 
-        streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
+        streamOne.join(streamTwo,
+            (value1, value2) -> value1,
+            JoinWindows.of(Duration.ofHours(1)),
+            StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
         builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
@@ -821,12 +824,17 @@ public class StreamsBuilderTest {
                                 STREAM_OPERATION_NAME + "-merge");
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldUseGeneratedNameForJoinOperationBetweenKStreamAndKStream() {
         final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
         final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
 
-        streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(STREAM_OPERATION_NAME));
+        streamOne.join(streamTwo,
+            (value1, value2) -> value1,
+            JoinWindows.of(Duration.ofHours(1)),
+            StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
+                .withName(STREAM_OPERATION_NAME));
         builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
@@ -920,7 +928,7 @@ public class StreamsBuilderTest {
     @Test
     public void shouldUseSpecifiedNameForProcessOperation() {
         builder.stream(STREAM_TOPIC)
-                .process(new MockProcessorSupplier<>(), Named.as("test-processor"));
+                .process(new MockApiProcessorSupplier<>(), Named.as("test-processor"));
 
         builder.build();
         final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index fa2b3b3..90dc9e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -67,7 +67,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertNotNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -96,7 +95,7 @@ public class GlobalKTableIntegrationTest {
     private String streamTopic;
     private GlobalKTable<Long, String> globalTable;
     private KStream<String, Long> stream;
-    private MockProcessorSupplier<String, String> supplier;
+    private MockApiProcessorSupplier<String, String, Void, Void> supplier;
 
     @Rule
     public TestName testName = new TestName();
@@ -119,7 +118,7 @@ public class GlobalKTableIntegrationTest {
                                                   .withValueSerde(Serdes.String()));
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
         stream = builder.stream(streamTopic, stringLongConsumed);
-        supplier = new MockProcessorSupplier<>();
+        supplier = new MockApiProcessorSupplier<>();
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 6d89325..98dec87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -30,7 +30,9 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
@@ -107,7 +109,6 @@ public class GlobalThreadShutDownOrderTest {
     @Rule
     public TestName testName = new TestName();
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Before
     public void before() throws Exception {
         builder = new StreamsBuilder();
@@ -196,8 +197,7 @@ public class GlobalThreadShutDownOrderTest {
     }
 
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private class GlobalStoreProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, Long> {
+    private class GlobalStoreProcessor implements Processor<String, Long, Void, Void> {
 
         private KeyValueStore<String, Long> store;
         private final String storeName;
@@ -207,14 +207,12 @@ public class GlobalThreadShutDownOrderTest {
         }
 
         @Override
-        @SuppressWarnings("unchecked")
-        public void init(final ProcessorContext context) {
-            super.init(context);
-            store = (KeyValueStore<String, Long>) context.getStateStore(storeName);
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(storeName);
         }
 
         @Override
-        public void process(final String key, final Long value) {
+        public void process(final Record<String, Long> record) {
             firstRecordProcessed = true;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index aabf6e2..1c22bfc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.integration;
 
+import java.util.Collections;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,7 +29,6 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -65,12 +65,10 @@ import java.util.function.Supplier;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class RangeQueryIntegrationTest {
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
     private static final Properties STREAMS_CONFIG = new Properties();
     private static final String APP_ID = "range-query-integration-test";
     private static final Long COMMIT_INTERVAL = 100L;
@@ -78,14 +76,14 @@ public class RangeQueryIntegrationTest {
     private static final String TABLE_NAME = "mytable";
     private static final int DATA_SIZE = 5;
 
-    private enum StoreType { InMemory, RocksDB, Timed };
-    private StoreType storeType;
-    private boolean enableLogging;
-    private boolean enableCaching;
-    private boolean forward;
-    private KafkaStreams kafkaStreams;
+    private enum StoreType { InMemory, RocksDB, Timed }
+
+    private final StoreType storeType;
+    private final boolean enableLogging;
+    private final boolean enableCaching;
+    private final boolean forward;
+    private final LinkedList<KeyValue<String, String>> records;
 
-    private LinkedList<KeyValue<String, String>> records;
     private String low;
     private String high;
     private String middle;
@@ -176,13 +174,11 @@ public class RangeQueryIntegrationTest {
     @Test
     public void testStoreConfig() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
-        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, TABLE_NAME, enableLogging, enableCaching);
-        final KTable<String, String> table = builder.table(inputStream, stateStoreConfig);
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, enableLogging, enableCaching);
+        builder.table(inputStream, stateStoreConfig);
 
         try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) {
-            final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams);
-
-            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60));
 
             writeInputData();
 
@@ -221,32 +217,29 @@ public class RangeQueryIntegrationTest {
     }
 
     private List<KeyValue<String, String>> filterList(final KeyValueIterator<String, String> iterator, final String from, final String to) {
-        final Predicate<KeyValue<String, String>> pred = new Predicate<KeyValue<String, String>>() {
-            @Override
-            public boolean test(final KeyValue<String, String> elem) {
-                if (from != null && elem.key.compareTo(from) < 0) {
-                    return false;
-                }
-                if (to != null && elem.key.compareTo(to) > 0) {
-                    return false;
-                }
-                return elem != null;
+        final Predicate<KeyValue<String, String>> predicate = elem -> {
+            if (from != null && elem.key.compareTo(from) < 0) {
+                return false;
+            }
+            if (to != null && elem.key.compareTo(to) > 0) {
+                return false;
             }
+            return elem != null;
         };
 
-        return Utils.toList(iterator, pred);
+        return Utils.toList(iterator, predicate);
     }
 
     private void testRange(final String name, final ReadOnlyKeyValueStore<String, String> store, final String from, final String to, final boolean forward) {
         try (final KeyValueIterator<String, String> resultIterator = forward ? store.range(from, to) : store.reverseRange(from, to);
-             final KeyValueIterator<String, String> expectedIterator = forward ? store.all() : store.reverseAll();) {
+             final KeyValueIterator<String, String> expectedIterator = forward ? store.all() : store.reverseAll()) {
             final List<KeyValue<String, String>> result = Utils.toList(resultIterator);
             final List<KeyValue<String, String>> expected = filterList(expectedIterator, from, to);
-            assertThat(result, is(expected));
+            assertThat(name, result, is(expected));
         }
     }
 
-    private Materialized<String, String, KeyValueStore<Bytes, byte[]>> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) {
+    private Materialized<String, String, KeyValueStore<Bytes, byte[]>> getStoreConfig(final StoreType type, final boolean cachingEnabled, final boolean loggingEnabled) {
         final Supplier<KeyValueBytesStoreSupplier> createStore = () -> {
             if (type == StoreType.InMemory) {
                 return Stores.inMemoryKeyValueStore(TABLE_NAME);
@@ -270,7 +263,7 @@ public class RangeQueryIntegrationTest {
             stateStoreConfig.withCachingDisabled();
         }
         if (loggingEnabled) {
-            stateStoreConfig.withLoggingEnabled(new HashMap<String, String>());
+            stateStoreConfig.withLoggingEnabled(new HashMap<>());
         } else {
             stateStoreConfig.withLoggingDisabled();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 1bff14d..2c0e180 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -40,9 +40,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.TrackingS
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -291,10 +293,8 @@ public class RestoreIntegrationTest {
         assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException {
-
         IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
                                                            asList(KeyValue.pair(1, 1),
                                                                          KeyValue.pair(2, 2),
@@ -430,9 +430,7 @@ public class RestoreIntegrationTest {
         }
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    public static class KeyValueStoreProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
-
+    public static class KeyValueStoreProcessor implements Processor<Integer, Integer, Void, Void> {
         private final String topic;
         private final CountDownLatch processorLatch;
 
@@ -443,22 +441,18 @@ public class RestoreIntegrationTest {
             this.processorLatch = processorLatch;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
-            this.store = (KeyValueStore<Integer, Integer>) context.getStateStore(topic);
+        public void init(final ProcessorContext<Void, Void> context) {
+            this.store = context.getStateStore(topic);
         }
 
         @Override
-        public void process(final Integer key, final Integer value) {
-            if (key != null) {
-                store.put(key, value);
+        public void process(final Record<Integer, Integer> record) {
+            if (record.key() != null) {
+                store.put(record.key(), record.value());
                 processorLatch.countDown();
             }
         }
-
-        @Override
-        public void close() { }
     }
 
     private void createStateForRestoration(final String changelogTopic, final int startingOffset) {
@@ -499,5 +493,4 @@ public class RestoreIntegrationTest {
         consumer.commitSync();
         consumer.close();
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 9c6085f..6c6fc5d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -27,7 +27,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -59,7 +61,6 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 @Category({IntegrationTest.class})
 public class StoreUpgradeIntegrationTest {
     private static final String STORE_NAME = "store";
@@ -953,124 +954,112 @@ public class StoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class KeyValueProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
+    private static class KeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
         private KeyValueStore<Integer, Long> store;
 
-        @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
-            store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
         }
 
         @Override
-        public void process(final Integer key, final Integer value) {
+        public void process(final Record<Integer, Integer> record) {
             final long newCount;
 
-            final Long oldCount = store.get(key);
+            final Long oldCount = store.get(record.key());
             if (oldCount != null) {
                 newCount = oldCount + 1L;
             } else {
                 newCount = 1L;
             }
 
-            store.put(key, newCount);
+            store.put(record.key(), newCount);
         }
 
         @Override
         public void close() {}
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class TimestampedKeyValueProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
-        private ProcessorContext context;
+    private static class TimestampedKeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
         private TimestampedKeyValueStore<Integer, Long> store;
 
-        @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
-            this.context = context;
-            store = (TimestampedKeyValueStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
         }
 
         @Override
-        public void process(final Integer key, final Integer value) {
+        public void process(final Record<Integer, Integer> record) {
             final long newCount;
 
-            final ValueAndTimestamp<Long> oldCountWithTimestamp = store.get(key);
+            final ValueAndTimestamp<Long> oldCountWithTimestamp = store.get(record.key());
             final long newTimestamp;
 
             if (oldCountWithTimestamp == null) {
                 newCount = 1L;
-                newTimestamp = context.timestamp();
+                newTimestamp = record.timestamp();
             } else {
                 newCount = oldCountWithTimestamp.value() + 1L;
-                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp());
+                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), record.timestamp());
             }
 
-            store.put(key, ValueAndTimestamp.make(newCount, newTimestamp));
+            store.put(record.key(), ValueAndTimestamp.make(newCount, newTimestamp));
         }
 
         @Override
         public void close() {}
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class WindowedProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
+    private static class WindowedProcessor implements Processor<Integer, Integer, Void, Void> {
         private WindowStore<Integer, Long> store;
 
-        @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
-            store = (WindowStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
         }
 
         @Override
-        public void process(final Integer key, final Integer value) {
+        public void process(final Record<Integer, Integer> record) {
             final long newCount;
 
-            final Long oldCount = store.fetch(key, key < 10 ? 0L : 100000L);
+            final Long oldCount = store.fetch(record.key(), record.key() < 10 ? 0L : 100000L);
             if (oldCount != null) {
                 newCount = oldCount + 1L;
             } else {
                 newCount = 1L;
             }
 
-            store.put(key, newCount, key < 10 ? 0L : 100000L);
+            store.put(record.key(), newCount, record.key() < 10 ? 0L : 100000L);
         }
 
         @Override
         public void close() {}
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class TimestampedWindowedProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
-        private ProcessorContext context;
+    private static class TimestampedWindowedProcessor implements Processor<Integer, Integer, Void, Void> {
         private TimestampedWindowStore<Integer, Long> store;
 
-        @SuppressWarnings("unchecked")
         @Override
-        public void init(final ProcessorContext context) {
-            this.context = context;
-            store = (TimestampedWindowStore<Integer, Long>) context.getStateStore(STORE_NAME);
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
         }
 
         @Override
-        public void process(final Integer key, final Integer value) {
+        public void process(final Record<Integer, Integer> record) {
             final long newCount;
 
-            final ValueAndTimestamp<Long> oldCountWithTimestamp = store.fetch(key, key < 10 ? 0L : 100000L);
+            final ValueAndTimestamp<Long> oldCountWithTimestamp = store.fetch(record.key(), record.key() < 10 ? 0L : 100000L);
             final long newTimestamp;
 
             if (oldCountWithTimestamp == null) {
                 newCount = 1L;
-                newTimestamp = context.timestamp();
+                newTimestamp = record.timestamp();
             } else {
                 newCount = oldCountWithTimestamp.value() + 1L;
-                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), context.timestamp());
+                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), record.timestamp());
             }
 
-            store.put(key, ValueAndTimestamp.make(newCount, newTimestamp), key < 10 ? 0L : 100000L);
+            store.put(record.key(), ValueAndTimestamp.make(newCount, newTimestamp), record.key() < 10 ? 0L : 100000L);
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 6f35d12..2aec4ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -28,6 +28,8 @@ import org.apache.kafka.streams.TaskMetadata;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -83,7 +85,6 @@ public class TaskMetadataIntegrationTest {
     private AtomicBoolean process;
     private AtomicBoolean commit;
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Before
     public void setup() {
         final String testId = safeUniqueTestName(getClass(), testName);
@@ -185,18 +186,15 @@ public class TaskMetadataIntegrationTest {
                 timestamp);
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private class PauseProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+    private class PauseProcessor extends ContextualProcessor<String, String, Void, Void> {
         @Override
-        public void process(final String key, final String value) {
+        public void process(final Record<String, String> record) {
             while (!process.get()) {
                 try {
                     wait(100);
                 } catch (final InterruptedException e) {
-
                 }
             }
-            context().forward(key, value);
             if (commit.get()) {
                 context().commit();
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index cad978c..b6d3206 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -38,7 +40,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@SuppressWarnings("deprecation")
 public class RepartitionTopicNamingTest {
 
     private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k + v;
@@ -104,8 +105,8 @@ public class RepartitionTopicNamingTest {
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey(Grouped.as("grouping"));
 
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count().toStream().to("output-one");
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
 
         final String topologyString = builder.build().describe().toString();
         assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
@@ -119,11 +120,11 @@ public class RepartitionTopicNamingTest {
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey(Grouped.as("grouping"));
 
-        final TimeWindowedKStream<String, String> timeWindowedKStream = kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L)));
+        final TimeWindowedKStream<String, String> timeWindowedKStream = kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)));
 
         timeWindowedKStream.count().toStream().to("output-one");
         timeWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two");
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
 
         final String topologyString = builder.build().describe().toString();
         assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
@@ -137,11 +138,11 @@ public class RepartitionTopicNamingTest {
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey(Grouped.as("grouping"));
 
-        final SessionWindowedKStream<String, String> sessionWindowedKStream = kGroupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(10L)));
+        final SessionWindowedKStream<String, String> sessionWindowedKStream = kGroupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)));
 
         sessionWindowedKStream.count().toStream().to("output-one");
         sessionWindowedKStream.reduce((v, v2) -> v + v2).toStream().to("output-two");
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
 
         final String topologyString = builder.build().describe().toString();
         assertThat(1, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
@@ -166,8 +167,8 @@ public class RepartitionTopicNamingTest {
         final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey();
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count().toStream().to("output-one");
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
         final String topologyString = builder.build().describe().toString();
         assertThat(2, is(getCountOfRepartitionTopicsFound(topologyString, repartitionTopicPattern)));
     }
@@ -188,8 +189,8 @@ public class RepartitionTopicNamingTest {
         final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey(Grouped.as("grouping"));
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count();
         final Properties properties = new Properties();
         properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         final Topology topology = builder.build(properties);
@@ -207,10 +208,10 @@ public class RepartitionTopicNamingTest {
             final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k);
 
             final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2,
-                JoinWindows.of(Duration.ofMillis(30L)),
+                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(30L)),
                 StreamJoined.<String, String, String>as("join-store").withName("join-repartition"));
 
-            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)),
+            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(30L)),
                 StreamJoined.<String, String, String>as("join-store").withName("join-repartition"));
 
             builder.build();
@@ -228,8 +229,8 @@ public class RepartitionTopicNamingTest {
         final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
                                                                      .selectKey((k, v) -> k)
                                                                      .groupByKey(Grouped.as("grouping"));
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
-        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
+        kGroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count();
         builder.build(properties);
     }
 
@@ -363,15 +364,25 @@ public class RepartitionTopicNamingTest {
 
         if (isGroupByKey) {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+                selectKeyStream.filter((k, v) -> true)
+                    .mapValues(v -> v)
+                    .groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName))
+                    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)))
+                    .count();
             } else {
-                selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+                selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName))
+                    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)))
+                    .count();
             }
         } else {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+                selectKeyStream.filter((k, v) -> true)
+                    .mapValues(v -> v)
+                    .groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName))
+                    .count();
             } else {
-                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName))
+                    .count();
             }
         }
 
@@ -388,15 +399,27 @@ public class RepartitionTopicNamingTest {
         final String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
         if (isGroupByKey) {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+                selectKeyStream.filter((k, v) -> true)
+                    .mapValues(v -> v)
+                    .groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName))
+                    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+                    .count();
             } else {
-                selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+                selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName))
+                    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+                    .count();
             }
         } else {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+                selectKeyStream.filter((k, v) -> true)
+                    .mapValues(v -> v)
+                    .groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName))
+                    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+                    .count();
             } else {
-                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName))
+                    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)))
+                    .count();
             }
         }
 
@@ -446,7 +469,7 @@ public class RepartitionTopicNamingTest {
         }
 
         final String joinRepartitionTopicName = "my-join";
-        updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(1000L)),
+        updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1000L)),
             StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName(joinRepartitionTopicName));
 
         return builder.build().describe().toString();
@@ -463,7 +486,6 @@ public class RepartitionTopicNamingTest {
     }
 
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     private Topology buildTopology(final String optimizationConfig) {
         final Initializer<Integer> initializer = () -> 0;
         final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
@@ -495,7 +517,7 @@ public class RepartitionTopicNamingTest {
 
         mappedStream.filter((k, v) -> k.equals("A"))
                 .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
-                        JoinWindows.of(Duration.ofMillis(5000L)),
+                        JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(5000L)),
                         StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long()).withStoreName(fourthRepartitionTopicName).withName(fourthRepartitionTopicName))
                 .to(JOINED_TOPIC);
 
@@ -506,8 +528,7 @@ public class RepartitionTopicNamingTest {
     }
 
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class SimpleProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+    private static class SimpleProcessor implements Processor<String, String, Void, Void> {
 
         final List<String> valueList;
 
@@ -516,8 +537,8 @@ public class RepartitionTopicNamingTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            valueList.add(value);
+        public void process(final Record<String, String> record) {
+            valueList.add(record.value());
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index ae7ef8f..9920e4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -19,6 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.NoopValueTransformer;
 import org.apache.kafka.test.NoopValueTransformerWithKey;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -30,7 +35,6 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Test;
 
 import java.util.Random;
@@ -41,7 +45,6 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class AbstractStreamTest {
 
     @Test
@@ -69,12 +72,11 @@ public class AbstractStreamTest {
         verify(valueTransformerWithKeySupplier);
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testShouldBeExtensible() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final String topicName = "topic";
 
         final ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())));
@@ -108,7 +110,7 @@ public class AbstractStreamTest {
         }
     }
 
-    private static class ExtendedKStreamDummy<K, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
+    private static class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V, K, V> {
 
         private final Random rand;
 
@@ -117,16 +119,16 @@ public class AbstractStreamTest {
         }
 
         @Override
-        public org.apache.kafka.streams.processor.Processor<K, V> get() {
+        public Processor<K, V, K, V> get() {
             return new ExtendedKStreamDummyProcessor();
         }
 
-        private class ExtendedKStreamDummyProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+        private class ExtendedKStreamDummyProcessor extends ContextualProcessor<K, V, K, V> {
             @Override
-            public void process(final K key, final V value) {
+            public void process(final Record<K, V> record) {
                 // flip a coin and filter
                 if (rand.nextBoolean()) {
-                    context().forward(key, value);
+                    context().forward(record);
                 }
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index e4d95e0..cb0218b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -38,7 +38,6 @@ import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
-
 public class GlobalKTableJoinsTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
@@ -56,10 +55,9 @@ public class GlobalKTableJoinsTest {
         keyValueMapper = (key, value) -> value;
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void shouldLeftJoinWithStream() {
-        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream
             .leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
             .process(supplier);
@@ -72,10 +70,9 @@ public class GlobalKTableJoinsTest {
         verifyJoin(expected, supplier);
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void shouldInnerJoinWithStream() {
-        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream
             .join(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
             .process(supplier);
@@ -88,7 +85,7 @@ public class GlobalKTableJoinsTest {
     }
 
     private void verifyJoin(final Map<String, ValueAndTimestamp<String>> expected,
-                            final MockProcessorSupplier<String, String> supplier) {
+                            final MockApiProcessorSupplier<String, String, Void, Void> supplier) {
         final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 333884e..5025e7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -40,8 +40,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -61,7 +61,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KGroupedStreamImplTest {
 
     private static final String TOPIC = "topic";
@@ -95,7 +94,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotHaveNullReducerWithWindowedReduce() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
                 .reduce(null, Materialized.as("store")));
     }
 
@@ -107,7 +106,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
         assertThrows(TopologyException.class, () ->  groupedStream
-                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
                 .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
     }
 
@@ -132,14 +131,14 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotHaveNullInitializerOnWindowedAggregate() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
                 .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
     }
 
     @Test
     public void shouldNotHaveNullAdderOnWindowedAggregate() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
                 .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
     }
 
@@ -151,14 +150,14 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
         assertThrows(TopologyException.class, () ->  groupedStream
-                .windowedBy(TimeWindows.of(ofMillis(10)))
+                .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)))
                 .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
     }
 
     @Test
     public void shouldNotHaveNullReducerWithSlidingWindowedReduce() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
                 .reduce(null, Materialized.as("store")));
     }
 
@@ -170,36 +169,36 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotHaveInvalidStoreNameWithSlidingWindowedReduce() {
         assertThrows(TopologyException.class, () ->  groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
                 .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
     }
 
     @Test
     public void shouldNotHaveNullInitializerOnSlidingWindowedAggregate() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
                 .aggregate(null, MockAggregator.TOSTRING_ADDER, Materialized.as("store")));
     }
 
     @Test
     public void shouldNotHaveNullAdderOnSlidingWindowedAggregate() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
                 .aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store")));
     }
 
     @Test
     public void shouldNotHaveInvalidStoreNameOnSlidingWindowedAggregate() {
         assertThrows(TopologyException.class, () ->  groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
                 .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)));
     }
 
     @Test
     public void shouldCountSlidingWindows() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
                 .count(Materialized.as("aggregate-by-key-windowed"))
                 .toStream()
                 .process(supplier);
@@ -209,9 +208,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldCountSlidingWindowsWithInternalStoreName() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         groupedStream
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
+                .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(500L), ofMillis(2000L)))
                 .count()
                 .toStream()
                 .process(supplier);
@@ -219,7 +218,7 @@ public class KGroupedStreamImplTest {
         doCountSlidingWindows(supplier);
     }
 
-    private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>, Long> supplier) {
+    private void doCountSlidingWindows(final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
                     driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -314,7 +313,7 @@ public class KGroupedStreamImplTest {
         )));
     }
 
-    private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) {
+    private void doAggregateSessionWindows(final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
                     driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -340,9 +339,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldAggregateSessionWindows() {
-        final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KTable<Windowed<String>, Integer> table = groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
             .aggregate(
                 () -> 0,
                 (aggKey, value, aggregate) -> aggregate + 1,
@@ -358,9 +357,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldAggregateSessionWindowsWithInternalStoreName() {
-        final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KTable<Windowed<String>, Integer> table = groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
             .aggregate(
                 () -> 0,
                 (aggKey, value, aggregate) -> aggregate + 1,
@@ -371,8 +370,7 @@ public class KGroupedStreamImplTest {
         doAggregateSessionWindows(supplier);
     }
 
-    private void doCountSessionWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
-
+    private void doCountSessionWindows(final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
                     driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -398,9 +396,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldCountSessionWindows() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KTable<Windowed<String>, Long> table = groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
             .count(Materialized.as("session-store"));
         table.toStream().process(supplier);
         doCountSessionWindows(supplier);
@@ -409,16 +407,16 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldCountSessionWindowsWithInternalStoreName() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KTable<Windowed<String>, Long> table = groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
             .count();
         table.toStream().process(supplier);
         doCountSessionWindows(supplier);
         assertNull(table.queryableStoreName());
     }
 
-    private void doReduceSessionWindows(final MockProcessorSupplier<Windowed<String>, String> supplier) {
+    private void doReduceSessionWindows(final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
                     driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -444,9 +442,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldReduceSessionWindows() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KTable<Windowed<String>, String> table = groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
             .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as("session-store"));
         table.toStream().process(supplier);
         doReduceSessionWindows(supplier);
@@ -455,9 +453,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldReduceSessionWindowsWithInternalStoreName() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KTable<Windowed<String>, String> table = groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(30)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
             .reduce((value1, value2) -> value1 + ":" + value2);
         table.toStream().process(supplier);
         doReduceSessionWindows(supplier);
@@ -467,7 +465,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
                 .reduce(null, Materialized.as("store")));
     }
 
@@ -479,7 +477,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
         assertThrows(TopologyException.class, () ->  groupedStream
-                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
                 .reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME))
         );
     }
@@ -487,7 +485,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
                 .reduce(null, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(null))
         );
     }
@@ -495,7 +493,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
                 .aggregate(null, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
         );
     }
@@ -503,7 +501,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
         assertThrows(NullPointerException.class, () -> groupedStream.
-                windowedBy(SessionWindows.with(ofMillis(30)))
+                windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
                 .aggregate(MockInitializer.STRING_INIT, null, (aggKey, aggOne, aggTwo) -> null, Materialized.as("storeName"))
         );
     }
@@ -511,7 +509,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
         assertThrows(NullPointerException.class, () ->  groupedStream
-                .windowedBy(SessionWindows.with(ofMillis(30)))
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(30)))
                 .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as("storeName"))
         );
     }
@@ -524,7 +522,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
         groupedStream
-            .windowedBy(SessionWindows.with(ofMillis(10)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(10)))
             .aggregate(
                     MockInitializer.STRING_INIT,
                     MockAggregator.TOSTRING_ADDER,
@@ -535,7 +533,7 @@ public class KGroupedStreamImplTest {
     @Test
     public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
         assertThrows(TopologyException.class, () ->  groupedStream
-                .windowedBy(SessionWindows.with(ofMillis(10)))
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(10)))
                 .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (aggKey, aggOne, aggTwo) -> null, Materialized.as(INVALID_STORE_NAME))
         );
     }
@@ -677,7 +675,7 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldAggregateWithDefaultSerdes() {
-        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         groupedStream
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER)
             .toStream()
@@ -710,7 +708,7 @@ public class KGroupedStreamImplTest {
         inputTopic.pipeInput("3", (String) null);
     }
 
-    private void doCountWindowed(final  MockProcessorSupplier<Windowed<String>, Long> supplier) {
+    private void doCountWindowed(final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
                     driver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
@@ -745,9 +743,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldCountWindowed() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), ofMillis(100L)))
             .count(Materialized.as("aggregate-by-key-windowed"))
             .toStream()
             .process(supplier);
@@ -757,9 +755,9 @@ public class KGroupedStreamImplTest {
 
     @Test
     public void shouldCountWindowedWithInternalStoreName() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), ofMillis(100L)))
             .count()
             .toStream()
             .process(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index bc3f461..c4e9de6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -40,14 +40,13 @@ public class KStreamFilterTest {
 
     private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testFilter() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         final KStream<Integer, String> stream;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filter(isMultipleOfThree).process(supplier);
@@ -62,14 +61,13 @@ public class KStreamFilterTest {
         assertEquals(2, supplier.theCapturedProcessor().processed().size());
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testFilterNot() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
         final KStream<Integer, String> stream;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filterNot(isMultipleOfThree).process(supplier);
@@ -93,6 +91,5 @@ public class KStreamFilterTest {
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)
             .to("nirvana");
-        
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 1f763a0..69bb32c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -45,7 +45,6 @@ import static org.junit.Assert.assertThrows;
 public class KStreamFlatMapTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testFlatMap() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -63,9 +62,7 @@ public class KStreamFlatMapTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         final KStream<Integer, String> stream;
-        final MockProcessorSupplier<String, String> supplier;
-
-        supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.flatMap(mapper).process(supplier);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index c1930e5..413b628 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -41,7 +41,6 @@ public class KStreamFlatMapValuesTest {
     private final String topicName = "topic";
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testFlatMapValues() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -57,7 +56,7 @@ public class KStreamFlatMapValuesTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream.flatMapValues(mapper).process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -78,7 +77,6 @@ public class KStreamFlatMapValuesTest {
     }
 
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testFlatMapValuesWithKeys() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -94,7 +92,7 @@ public class KStreamFlatMapValuesTest {
         final int[] expectedKeys = {0, 1, 2, 3};
 
         final KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream.flatMapValues(mapper).process(supplier);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 8082255..68fef1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -20,7 +20,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Before;
@@ -31,14 +33,13 @@ import java.util.Collections;
 
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamFlatTransformTest extends EasyMockSupport {
 
     private Number inputKey;
     private Number inputValue;
 
     private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
-    private ProcessorContext context;
+    private InternalProcessorContext<Integer, Integer> context;
 
     private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
 
@@ -47,7 +48,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
         inputKey = 1;
         inputValue = 10;
         transformer = mock(Transformer.class);
-        context = strictMock(ProcessorContext.class);
+        context = strictMock(InternalProcessorContext.class);
         processor = new KStreamFlatTransformProcessor<>(transformer);
     }
 
@@ -72,11 +73,11 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
 
         EasyMock.expect(transformer.transform(inputKey, inputValue)).andReturn(outputRecords);
         for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
-            context.forward(outputRecord.key, outputRecord.value);
+            context.forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
         }
         replayAll();
 
-        processor.process(inputKey, inputValue);
+        processor.process(new Record<>(inputKey, inputValue, 0L));
 
         verifyAll();
     }
@@ -87,10 +88,10 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
         EasyMock.reset(transformer);
 
         EasyMock.expect(transformer.transform(inputKey, inputValue))
-            .andReturn(Collections.<KeyValue<Integer, Integer>>emptyList());
+            .andReturn(Collections.emptyList());
         replayAll();
 
-        processor.process(inputKey, inputValue);
+        processor.process(new Record<>(inputKey, inputValue, 0L));
 
         verifyAll();
     }
@@ -104,7 +105,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
             .andReturn(null);
         replayAll();
 
-        processor.process(inputKey, inputValue);
+        processor.process(new Record<>(inputKey, inputValue, 0L));
 
         verifyAll();
     }
@@ -129,7 +130,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
         EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
         replayAll();
 
-        final org.apache.kafka.streams.processor.Processor<Number, Number> processor = processorSupplier.get();
+        final Processor<Number, Number, Integer, Integer> processor = processorSupplier.get();
 
         verifyAll();
         assertTrue(processor instanceof KStreamFlatTransformProcessor);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index fd64604..fcb75e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -24,21 +24,22 @@ import java.util.Collections;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamFlatTransformValuesTest extends EasyMockSupport {
 
     private Integer inputKey;
     private Integer inputValue;
 
     private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
-    private ProcessorContext context;
+    private InternalProcessorContext<Integer, String> context;
 
     private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
 
@@ -47,7 +48,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
         inputKey = 1;
         inputValue = 10;
         valueTransformer = mock(ValueTransformerWithKey.class);
-        context = strictMock(ProcessorContext.class);
+        context = strictMock(InternalProcessorContext.class);
         processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
     }
 
@@ -72,11 +73,11 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
 
         EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues);
         for (final String outputValue : outputValues) {
-            context.forward(inputKey, outputValue);
+            context.forward(new Record<>(inputKey, outputValue, 0L));
         }
         replayAll();
 
-        processor.process(inputKey, inputValue);
+        processor.process(new Record<>(inputKey, inputValue, 0L));
 
         verifyAll();
     }
@@ -86,10 +87,10 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
         processor.init(context);
         EasyMock.reset(valueTransformer);
 
-        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.<String>emptyList());
+        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.emptyList());
         replayAll();
 
-        processor.process(inputKey, inputValue);
+        processor.process(new Record<>(inputKey, inputValue, 0L));
 
         verifyAll();
     }
@@ -102,7 +103,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
         EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null);
         replayAll();
 
-        processor.process(inputKey, inputValue);
+        processor.process(new Record<>(inputKey, inputValue, 0L));
 
         verifyAll();
     }
@@ -127,7 +128,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
         EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
         replayAll();
 
-        final org.apache.kafka.streams.processor.Processor<Integer, Integer> processor = processorSupplier.get();
+        final Processor<Integer, Integer, Integer, String> processor = processorSupplier.get();
 
         verifyAll();
         assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index fda87dc..8c65656 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
@@ -52,10 +52,9 @@ public class KStreamGlobalKTableJoinTest {
     private final int[] expectedKeys = {0, 1, 2, 3};
 
     private TopologyTestDriver driver;
-    private MockProcessor<Integer, String> processor;
+    private MockApiProcessor<Integer, String, Void, Void> processor;
     private StreamsBuilder builder;
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Before
     public void setUp() {
 
@@ -64,7 +63,7 @@ public class KStreamGlobalKTableJoinTest {
         final GlobalKTable<String, String> table; // value of stream optionally contains key of table
         final KeyValueMapper<Integer, String, String> keyMapper;
 
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
         final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index 9268997..fe6c1d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
@@ -51,11 +51,10 @@ public class KStreamGlobalKTableLeftJoinTest {
     private final String globalTableTopic = "globalTableTopic";
     private final int[] expectedKeys = {0, 1, 2, 3};
 
-    private MockProcessor<Integer, String> processor;
+    private MockApiProcessor<Integer, String, Void, Void> processor;
     private TopologyTestDriver driver;
     private StreamsBuilder builder;
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Before
     public void setUp() {
 
@@ -64,7 +63,7 @@ public class KStreamGlobalKTableLeftJoinTest {
         final GlobalKTable<String, String> table; // value of stream optionally contains key of table
         final KeyValueMapper<Integer, String, String> keyMapper;
 
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
         final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 9753453..0af5bd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -62,8 +62,9 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -100,12 +101,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamImplTest {
 
     private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
-    private final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+    private final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
     private final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier =
         () -> new Transformer<String, String, KeyValue<String, String>>() {
             @Override
@@ -1527,7 +1526,7 @@ public class KStreamImplTest {
             inputTopic.pipeInput("a", "v2");
             inputTopic.pipeInput("b", "v1");
         }
-        final List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2);
+        final List<MockApiProcessor<String, String, Void, Void>> mockProcessors = processorSupplier.capturedProcessors(2);
         assertThat(mockProcessors.get(0).processed(), equalTo(asList(new KeyValueTimestamp<>("a", "v1", 0),
             new KeyValueTimestamp<>("a", "v2", 0))));
         assertThat(mockProcessors.get(1).processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 37c9e49..2ffa048 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -63,15 +63,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamKStreamJoinTest {
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
-    private final JoinWindows joinWindows = JoinWindows.of(ofMillis(50)).grace(Duration.ofMillis(50));
+    private final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(50), Duration.ofMillis(50));
     private final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
     private final String errorMessagePrefix = "Window settings mismatch. WindowBytesStoreSupplier settings";
 
@@ -85,7 +83,7 @@ public class KStreamKStreamJoinTest {
         left.join(
             right,
             Integer::sum,
-            JoinWindows.of(ofMillis(100)),
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)),
             StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
         );
 
@@ -115,8 +113,8 @@ public class KStreamKStreamJoinTest {
         final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
-        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one");
-        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to");
+        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100))).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100))).to("out-to");
         assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString());
     }
 
@@ -130,8 +128,8 @@ public class KStreamKStreamJoinTest {
         final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
         final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k));
         final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
-        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
-        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
+        newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), streamJoined.withName("first-join")).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)), streamJoined.withName("second-join")).to("out-two");
         final Topology topology =  builder.build(props);
         System.out.println(topology.describe().toString());
         assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString());
@@ -140,7 +138,7 @@ public class KStreamKStreamJoinTest {
     @Test
     public void shouldDisableLoggingOnStreamJoined() {
 
-        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
         final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
             .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
             .withStoreName("store")
@@ -167,7 +165,7 @@ public class KStreamKStreamJoinTest {
     @Test
     public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
 
-        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100), Duration.ofMillis(50));
         final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
             .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
             .withStoreName("store")
@@ -295,8 +293,8 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
-    public void shouldExceptionWhenJoinStoresDontHaveUniqueNames() {
-        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L)).grace(Duration.ofMillis(50L));
+    public void shouldExceptionWhenJoinStoresDoNotHaveUniqueNames() {
+        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), Duration.ofMillis(50L));
         final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
         final WindowBytesStoreSupplier thisStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
         final WindowBytesStoreSupplier otherStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 150L, 100L, true);
@@ -310,7 +308,7 @@ public class KStreamKStreamJoinTest {
 
     @Test
     public void shouldJoinWithCustomStoreSuppliers() {
-        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100L));
+        final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
 
         final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore(
             "in-memory-join-store",
@@ -344,7 +342,7 @@ public class KStreamKStreamJoinTest {
 
         final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
         final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KStream<String, Integer> joinedStream;
 
         joinedStream = left.join(
@@ -361,7 +359,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic("left", new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<String, Integer> inputTopicRight =
                     driver.createInputTopic("right", new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<String, Integer> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<String, Integer, Void, Void> processor = supplier.theCapturedProcessor();
 
             inputTopicLeft.pipeInput("A", 1, 1L);
             inputTopicLeft.pipeInput("B", 1, 2L);
@@ -385,13 +383,13 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100L)),
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -407,7 +405,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // push two items to the primary stream; the other window is empty
             // w1 = {}
@@ -507,7 +505,7 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -529,7 +527,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // push two items to the primary stream; the other window is empty; this should not produce items yet
             // w1 = {}
@@ -629,14 +627,14 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100L)),
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -652,7 +650,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
             long time = 0L;
 
             // push two items to the primary stream; the other window is empty; this should produce no items
@@ -1192,14 +1190,14 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(0)).after(ofMillis(100)).grace(ofMillis(0)),
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(0), ofMillis(0)).after(ofMillis(100)),
             StreamJoined.with(Serdes.Integer(),
                 Serdes.String(),
                 Serdes.String())
@@ -1217,7 +1215,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
             long time = 1000L;
 
             // push four items with increasing timestamps to the primary stream; the other window is empty; this should produce no items
@@ -1460,7 +1458,7 @@ public class KStreamKStreamJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -1468,7 +1466,7 @@ public class KStreamKStreamJoinTest {
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(0)).before(ofMillis(100)),
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).before(ofMillis(100)),
             StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
         );
         joined.process(supplier);
@@ -1484,7 +1482,7 @@ public class KStreamKStreamJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
             long time = 1000L;
 
             // push four items with increasing timestamps to the primary stream; the other window is empty; this should produce no items
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 4e2b6d8..2a29915 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -30,8 +30,8 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -47,8 +47,6 @@ import java.util.Set;
 import static java.time.Duration.ofMillis;
 import static org.junit.Assert.assertEquals;
 
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamKStreamLeftJoinTest {
     private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
 
@@ -57,6 +55,7 @@ public class KStreamKStreamLeftJoinTest {
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -66,7 +65,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -83,7 +82,7 @@ public class KStreamKStreamLeftJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // Only 2 window stores should be available
             assertEquals(2, driver.getAllStateStores().size());
@@ -116,6 +115,7 @@ public class KStreamKStreamLeftJoinTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -123,7 +123,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -140,7 +140,7 @@ public class KStreamKStreamLeftJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // Only 2 window stores should be available
             assertEquals(2, driver.getAllStateStores().size());
@@ -165,7 +165,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -182,7 +182,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // verifies non-joined duplicates are emitted when window has closed
             inputTopic1.pipeInput(0, "A0", 0L);
@@ -221,7 +221,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -238,7 +238,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -276,7 +276,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -293,7 +293,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -331,7 +331,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -348,7 +348,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -383,7 +383,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -400,7 +400,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -462,7 +462,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -485,7 +485,7 @@ public class KStreamKStreamLeftJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // 2 window stores + 1 shared window store should be available
             assertEquals(3, driver.getAllStateStores().size());
@@ -573,7 +573,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -590,7 +590,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // push two items to the primary stream; the other window is empty; this should not produce any item yet
             // w1 = {}
@@ -624,7 +624,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -647,7 +647,7 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // push two items to the primary stream; the other window is empty; this should not produce items because window has not closed
             // w1 = {}
@@ -696,7 +696,7 @@ public class KStreamKStreamLeftJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -719,7 +719,7 @@ public class KStreamKStreamLeftJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
             final long time = 0L;
 
             // push two items to the primary stream; the other window is empty; this should not produce any items
@@ -751,7 +751,7 @@ public class KStreamKStreamLeftJoinTest {
 
     private void testUpperWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
-                                      final MockProcessor<Integer, String> processor) {
+                                      final MockApiProcessor<Integer, String, Void, Void> processor) {
         long time;
 
         final TestInputTopic<Integer, String> inputTopic1 =
@@ -895,7 +895,7 @@ public class KStreamKStreamLeftJoinTest {
 
     private void testLowerWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
-                                      final MockProcessor<Integer, String> processor) {
+                                      final MockApiProcessor<Integer, String, Void, Void> processor) {
         long time;
         final TestInputTopic<Integer, String> inputTopic1 = driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2d9e320..0fcbfeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -30,8 +30,8 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
@@ -47,14 +47,13 @@ import java.util.Set;
 import static java.time.Duration.ofMillis;
 import static org.junit.Assert.assertEquals;
 
-
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamKStreamOuterJoinTest {
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
     private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testOuterJoinDuplicatesWithFixDisabledOldApi() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -62,7 +61,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -79,7 +78,7 @@ public class KStreamKStreamOuterJoinTest {
                     driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                     driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // Only 2 window stores should be available
             assertEquals(2, driver.getAllStateStores().size());
@@ -106,7 +105,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -123,7 +122,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // verifies non-joined duplicates are emitted when window has closed
             inputTopic1.pipeInput(0, "A0", 0L);
@@ -173,7 +172,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -190,7 +189,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -228,7 +227,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -245,7 +244,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -283,7 +282,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -300,7 +299,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -338,7 +337,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
@@ -355,7 +354,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             final long windowStart = 0L;
 
@@ -393,7 +392,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -410,7 +409,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // push two items to the primary stream; the other window is empty; this should not produce any item yet
             // w1 = {}
@@ -443,7 +442,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -466,7 +465,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // push one item to the primary stream; and one item in other stream; this should not produce items because there are no joins
             // and window has not ended
@@ -542,7 +541,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -565,7 +564,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
 
             // 2 window stores + 1 shared window store should be available
             assertEquals(3, driver.getAllStateStores().size());
@@ -655,7 +654,7 @@ public class KStreamKStreamOuterJoinTest {
         final KStream<Integer, String> stream1;
         final KStream<Integer, String> stream2;
         final KStream<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
@@ -678,7 +677,7 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final TestInputTopic<Integer, String> inputTopic2 =
                 driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
-            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+            final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
             final long time = 0L;
 
             // push two items to the primary stream; the other window is empty; this should not produce items because window has not closed
@@ -711,7 +710,7 @@ public class KStreamKStreamOuterJoinTest {
 
     private void testUpperWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
-                                      final MockProcessor<Integer, String> processor) {
+                                      final MockApiProcessor<Integer, String, Void, Void> processor) {
         long time;
 
         final TestInputTopic<Integer, String> inputTopic1 =
@@ -858,7 +857,7 @@ public class KStreamKStreamOuterJoinTest {
 
     private void testLowerWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
-                                      final MockProcessor<Integer, String> processor) {
+                                      final MockApiProcessor<Integer, String, Void, Void> processor) {
         long time;
         final TestInputTopic<Integer, String> inputTopic1 = driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 3c515b3..d162acf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -44,8 +44,8 @@ import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
@@ -61,12 +61,11 @@ public class KStreamKTableJoinTest {
     private TestInputTopic<Integer, String> inputTableTopic;
     private final int[] expectedKeys = {0, 1, 2, 3};
 
-    private MockProcessor<Integer, String> processor;
+    private MockApiProcessor<Integer, String, Void, Void> processor;
     private TopologyTestDriver driver;
     private StreamsBuilder builder;
-    private final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Before
     public void setUp() {
         builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index d9f227c..d68f9bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
@@ -56,10 +56,9 @@ public class KStreamKTableLeftJoinTest {
     private final int[] expectedKeys = {0, 1, 2, 3};
 
     private TopologyTestDriver driver;
-    private MockProcessor<Integer, String> processor;
+    private MockApiProcessor<Integer, String, Void, Void> processor;
     private StreamsBuilder builder;
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Before
     public void setUp() {
         builder = new StreamsBuilder();
@@ -67,7 +66,7 @@ public class KStreamKTableLeftJoinTest {
         final KStream<Integer, String> stream;
         final KTable<Integer, String> table;
 
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
         stream = builder.stream(streamTopic, consumed);
         table = builder.table(tableTopic, consumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index bf5ff26..c171b87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -43,14 +43,13 @@ import static org.junit.Assert.assertThrows;
 public class KStreamMapTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testMap() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topicName = "topic";
         final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.map((key, value) -> KeyValue.pair(value, key)).process(supplier);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index fe1e4a3..f3a773b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -36,10 +36,9 @@ import static org.junit.Assert.assertArrayEquals;
 
 public class KStreamMapValuesTest {
     private final String topicName = "topic";
-    private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
+    private final MockApiProcessorSupplier<Integer, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testFlatMapValues() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -64,7 +63,6 @@ public class KStreamMapValuesTest {
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testMapValuesWithKeys() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index d8e70f9..b1e9d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -41,7 +41,6 @@ public class KStreamSelectKeyTest {
     private final String topicName = "topic_key_select";
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testSelectKey() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -58,7 +57,7 @@ public class KStreamSelectKeyTest {
 
         final KStream<String, Integer>  stream =
             builder.stream(topicName, Consumed.with(Serdes.String(), Serdes.Integer()));
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream.selectKey((key, value) -> keyMap.get(value)).process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -73,7 +72,6 @@ public class KStreamSelectKeyTest {
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i));
         }
-
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 38c3fa7..8e8115f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -47,9 +47,9 @@ import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier
 import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.hamcrest.Matcher;
@@ -83,7 +83,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 @RunWith(Parameterized.class)
 public class KStreamSlidingWindowAggregateTest {
 
@@ -101,7 +100,6 @@ public class KStreamSlidingWindowAggregateTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -114,13 +112,13 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 Materialized.as(storeSupplier)
             );
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table.toStream().process(supplier);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
@@ -158,7 +156,6 @@ public class KStreamSlidingWindowAggregateTest {
         assertEquals(expected, actual);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -171,12 +168,12 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
             .reduce(
                 MockReducer.STRING_ADDER,
                 Materialized.as(storeSupplier)
             );
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table.toStream().process(supplier);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic =
@@ -227,14 +224,14 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 Materialized.as(storeSupplier)
             );
 
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -390,7 +387,7 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table1 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -399,14 +396,14 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 Materialized.as(storeSupplier2)
             );
 
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         table2.toStream().process(supplier);
@@ -422,7 +419,7 @@ public class KStreamSlidingWindowAggregateTest {
             inputTopic1.pipeInput("B", "2", 11L);
             inputTopic1.pipeInput("C", "3", 12L);
 
-            final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+            final List<MockApiProcessor<Windowed<String>, String, Void, Void>> processors = supplier.capturedProcessors(3);
 
             processors.get(0).checkAndClearProcessResult(
                     // left windows created by the first set of records to table 1
@@ -492,7 +489,6 @@ public class KStreamSlidingWindowAggregateTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testEarlyRecordsSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -501,13 +497,13 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(50), ofMillis(200)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(50), ofMillis(200)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
             );
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -526,7 +522,7 @@ public class KStreamSlidingWindowAggregateTest {
         for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
             final Windowed<String> window = entry.key();
             final Long start = window.window().start();
-            final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
             if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
                 actual.replace(start, valueAndTimestamp);
             }
@@ -543,7 +539,6 @@ public class KStreamSlidingWindowAggregateTest {
         assertEquals(expected, actual);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testEarlyRecordsRepeatedInput() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -552,13 +547,13 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(5), ofMillis(20)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(5), ofMillis(20)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
             );
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -578,7 +573,7 @@ public class KStreamSlidingWindowAggregateTest {
         for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
             final Windowed<String> window = entry.key();
             final Long start = window.window().start();
-            final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
             if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
                 actual.replace(start, valueAndTimestamp);
             }
@@ -603,14 +598,14 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 Materialized.as(storeSupplier)
             );
 
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -793,7 +788,6 @@ public class KStreamSlidingWindowAggregateTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testAggregateRandomInput() {
 
@@ -821,7 +815,7 @@ public class KStreamSlidingWindowAggregateTest {
                 },
                 Materialized.as(storeSupplier)
             );
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table.toStream().process(supplier);
         final long seed = new Random().nextLong();
         final Random shuffle = new Random(seed);
@@ -856,8 +850,8 @@ public class KStreamSlidingWindowAggregateTest {
             try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
                 final TestInputTopic<String, String> inputTopic1 =
                     driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
-                for (int i = 0; i < input.size(); i++) {
-                    inputTopic1.pipeInput("A", input.get(i).value(), input.get(i).timestamp());
+                for (final ValueAndTimestamp<String> i : input) {
+                    inputTopic1.pipeInput("A", i.value(), i.timestamp());
                 }
             }
 
@@ -866,7 +860,7 @@ public class KStreamSlidingWindowAggregateTest {
             for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
                 final Windowed<String> window = entry.key();
                 final Long start = window.window().start();
-                final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+                final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
                 if (results.putIfAbsent(start, valueAndTimestamp) != null) {
                     results.replace(start, valueAndTimestamp);
                 }
@@ -878,12 +872,8 @@ public class KStreamSlidingWindowAggregateTest {
                 t
             );
         } catch (final Throwable t) {
-            final StringBuilder sb =
-                new StringBuilder()
-                    .append("Exception in randomized scenario. Reproduce with seed: ")
-                    .append(seed)
-                    .append(".");
-            throw new AssertionError(sb.toString(), t);
+            final String msg = "Exception in randomized scenario. Reproduce with seed: " + seed + ".";
+            throw new AssertionError(msg, t);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 983e52d..0f2975f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -27,9 +27,10 @@ import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -46,13 +47,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 
 @RunWith(EasyMockRunner.class)
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamTransformValuesTest {
     private final String topicName = "topic";
     private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
     @Mock(MockType.NICE)
-    private ProcessorContext context;
+    private InternalProcessorContext context;
 
     @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
@@ -64,7 +64,7 @@ public class KStreamTransformValuesTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext context) { }
+                public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { }
 
                 @Override
                 public Integer transform(final Number value) {
@@ -107,7 +107,7 @@ public class KStreamTransformValuesTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext context) { }
+                public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { }
 
                 @Override
                 public Integer transform(final Integer readOnlyKey, final Number value) {
@@ -145,7 +145,7 @@ public class KStreamTransformValuesTest {
     public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
         final NoOpValueTransformerWithKeySupplier<String, String> transformer = new NoOpValueTransformerWithKeySupplier<>();
         final KStreamTransformValues<String, String, String> transformValues = new KStreamTransformValues<>(transformer);
-        final org.apache.kafka.streams.processor.Processor<String, String> processor = transformValues.get();
+        final Processor<String, String, String, String> processor = transformValues.get();
 
         processor.init(context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index df40c94..750f7f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -38,9 +38,9 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.hamcrest.Matcher;
 import org.junit.Test;
@@ -62,12 +62,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("deprecation")
 public class KStreamWindowAggregateTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testAggBasic() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -76,10 +74,10 @@ public class KStreamWindowAggregateTest {
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(100)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
 
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -145,7 +143,6 @@ public class KStreamWindowAggregateTest {
         );
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -155,16 +152,16 @@ public class KStreamWindowAggregateTest {
         final KTable<Windowed<String>, String> table1 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(100)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
 
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(100)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()));
         table2.toStream().process(supplier);
 
@@ -181,7 +178,7 @@ public class KStreamWindowAggregateTest {
             inputTopic1.pipeInput("D", "4", 3L);
             inputTopic1.pipeInput("A", "1", 9L);
 
-            final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
+            final List<MockApiProcessor<Windowed<String>, String, Void, Void>> processors = supplier.capturedProcessors(3);
 
             processors.get(0).checkAndClearProcessResult(
                 new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),  "0+1",  0),
@@ -274,7 +271,7 @@ public class KStreamWindowAggregateTest {
         builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.toStringInstance("+"),
@@ -299,7 +296,7 @@ public class KStreamWindowAggregateTest {
 
         final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
         stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-               .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90)))
+               .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(90)).advanceBy(ofMillis(5)))
                .aggregate(
                    () -> "",
                    MockAggregator.toStringInstance("+"),
@@ -362,7 +359,7 @@ public class KStreamWindowAggregateTest {
 
         final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
         stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-               .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L)))
+               .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(90L)).advanceBy(ofMillis(10)))
                .aggregate(
                    () -> "",
                    MockAggregator.toStringInstance("+"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index d3ed6b5..0974ed6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -36,8 +36,6 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -66,12 +64,11 @@ public class KTableFilterTest {
 
     private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     private void doTestKTable(final StreamsBuilder builder,
                               final KTable<String, Integer> table2,
                               final KTable<String, Integer> table3,
                               final String topic) {
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
         table3.toStream().process(supplier);
 
@@ -86,7 +83,7 @@ public class KTableFilterTest {
             inputTopic.pipeInput("B", null, 15L);
         }
 
-        final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+        final List<MockApiProcessor<String, Integer, Void, Void>> processors = supplier.capturedProcessors(2);
 
         processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", null, 10),
             new KeyValueTimestamp<>("B", 2, 5),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 462423a..3dd8a2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -45,10 +45,10 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -83,7 +83,6 @@ public class KTableImplTest {
         table = new StreamsBuilder().table("test");
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -93,7 +92,7 @@ public class KTableImplTest {
 
         final KTable<String, String> table1 = builder.table(topic1, consumed);
 
-        final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Object, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         final KTable<String, Integer> table2 = table1.mapValues(s -> Integer.valueOf(s));
@@ -117,7 +116,7 @@ public class KTableImplTest {
             inputTopic.pipeInput("A", "06", 8L);
         }
 
-        final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+        final List<MockApiProcessor<String, Object, Void, Void>> processors = supplier.capturedProcessors(4);
         assertEquals(asList(
             new KeyValueTimestamp<>("A", "01", 5),
             new KeyValueTimestamp<>("B", "02", 100),
@@ -152,7 +151,6 @@ public class KTableImplTest {
             processors.get(3).processed());
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testMaterializedKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -162,7 +160,7 @@ public class KTableImplTest {
 
         final KTable<String, String> table1 = builder.table(topic1, consumed, Materialized.as("fred"));
 
-        final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Object, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         final KTable<String, Integer> table2 = table1.mapValues(s -> Integer.valueOf(s));
@@ -186,7 +184,7 @@ public class KTableImplTest {
             inputTopic.pipeInput("A", "06", 8L);
         }
 
-        final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+        final List<MockApiProcessor<String, Object, Void, Void>> processors = supplier.capturedProcessors(4);
         assertEquals(asList(
             new KeyValueTimestamp<>("A", "01", 5),
             new KeyValueTimestamp<>("B", "02", 100),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index e761968..360c37e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -39,7 +39,6 @@ import static org.junit.Assert.assertEquals;
 public class KTableMapKeysTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testMapKeysConvertingToStream() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -60,7 +59,7 @@ public class KTableMapKeysTest {
         final int[] originalKeys = new int[] {1, 2, 3};
         final String[] values = new String[] {"V_ONE", "V_TWO", "V_THREE"};
 
-        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         convertedStream.process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index ff6e1b9..a7858b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
@@ -56,7 +55,7 @@ public class KTableMapValuesTest {
 
     private void doTestKTable(final StreamsBuilder builder,
                               final String topic1,
-                              final MockProcessorSupplier<String, Integer> supplier) {
+                              final MockApiProcessorSupplier<String, Integer, Void, Void> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic1 =
                     driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
@@ -71,7 +70,6 @@ public class KTableMapValuesTest {
         }
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -80,13 +78,12 @@ public class KTableMapValuesTest {
         final KTable<String, String> table1 = builder.table(topic1, consumed);
         final KTable<String, Integer> table2 = table1.mapValues(value -> value.charAt(0) - 48);
 
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         doTestKTable(builder, topic1, supplier);
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testQueryableKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -99,7 +96,7 @@ public class KTableMapValuesTest {
                 Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName")
                     .withValueSerde(Serdes.Integer()));
 
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table2.toStream().process(supplier);
 
         doTestKTable(builder, topic1, supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 83b8ac8..68c005a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -39,7 +39,6 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -61,7 +60,6 @@ public class KTableSourceTest {
     private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     @Test
     public void testKTable() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -69,7 +67,7 @@ public class KTableSourceTest {
 
         final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer()));
 
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         table1.toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index f26fbc6..8c7d179 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -54,7 +54,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class SessionWindowedKStreamImplTest {
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
@@ -66,7 +65,7 @@ public class SessionWindowedKStreamImplTest {
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SessionWindows.with(ofMillis(500)));
+                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)));
     }
 
     @Test
@@ -81,7 +80,7 @@ public class SessionWindowedKStreamImplTest {
     }
 
     private void shouldCountSessionWindowed() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream.count()
             .toStream()
             .process(supplier);
@@ -107,7 +106,7 @@ public class SessionWindowedKStreamImplTest {
 
     @Test
     public void shouldReduceWindowed() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream.reduce(MockReducer.STRING_ADDER)
             .toStream()
             .process(supplier);
@@ -133,7 +132,7 @@ public class SessionWindowedKStreamImplTest {
 
     @Test
     public void shouldAggregateSessionWindowed() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         stream.aggregate(MockInitializer.STRING_INIT,
                          MockAggregator.TOSTRING_ADDER,
                          sessionMerger,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
index e4a965e..64df966 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -54,7 +54,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class SlidingWindowedKStreamImplTest {
 
     private static final String TOPIC = "input";
@@ -67,12 +66,12 @@ public class SlidingWindowedKStreamImplTest {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         windowedStream = stream.
             groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L)));
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L)));
     }
 
     @Test
     public void shouldCountSlidingWindows() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         windowedStream
             .count()
             .toStream()
@@ -113,7 +112,7 @@ public class SlidingWindowedKStreamImplTest {
 
     @Test
     public void shouldReduceSlidingWindows() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         windowedStream
             .reduce(MockReducer.STRING_ADDER)
             .toStream()
@@ -154,7 +153,7 @@ public class SlidingWindowedKStreamImplTest {
 
     @Test
     public void shouldAggregateSlidingWindows() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         windowedStream
             .aggregate(
                 MockInitializer.STRING_INIT,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index aef79c8..f5f3ff8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -35,8 +35,8 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -52,7 +52,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class TimeWindowedKStreamImplTest {
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
@@ -64,12 +63,12 @@ public class TimeWindowedKStreamImplTest {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         windowedStream = stream.
             groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(500L)));
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
     }
 
     @Test
     public void shouldCountWindowed() {
-        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
         windowedStream
             .count()
             .toStream()
@@ -94,7 +93,7 @@ public class TimeWindowedKStreamImplTest {
 
     @Test
     public void shouldReduceWindowed() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         windowedStream
             .reduce(MockReducer.STRING_ADDER)
             .toStream()
@@ -119,7 +118,7 @@ public class TimeWindowedKStreamImplTest {
 
     @Test
     public void shouldAggregateWindowed() {
-        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
         windowedStream
             .aggregate(
                 MockInitializer.STRING_INIT,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index a898338..f8a7073 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.junit.Test;
 
@@ -31,7 +33,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class GraphGraceSearchUtilTest {
     @Test
     public void shouldThrowOnNull() {
@@ -50,12 +51,12 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
             "stateful",
             new ProcessorParameters<>(
-                () -> new org.apache.kafka.streams.processor.Processor<String, Long>() {
+                () -> new Processor<String, Long, String, Long>() {
                     @Override
-                    public void init(final ProcessorContext context) {}
+                    public void init(final ProcessorContext<String, Long> context) {}
 
                     @Override
-                    public void process(final String key, final Long value) {}
+                    public void process(final Record<String, Long> record) {}
 
                     @Override
                     public void close() {}
@@ -78,7 +79,7 @@ public class GraphGraceSearchUtilTest {
 
     @Test
     public void shouldExtractGraceFromKStreamWindowAggregateNode() {
-        final TimeWindows windows = TimeWindows.of(ofMillis(10L)).grace(ofMillis(1234L));
+        final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L));
         final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
             "asdf",
             new ProcessorParameters<>(
@@ -99,7 +100,7 @@ public class GraphGraceSearchUtilTest {
 
     @Test
     public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
-        final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
+        final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
 
         final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
             "asdf",
@@ -122,7 +123,7 @@ public class GraphGraceSearchUtilTest {
 
     @Test
     public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
-        final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
+        final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
         final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
             "asdf",
             new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -134,12 +135,12 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
             "stateful",
             new ProcessorParameters<>(
-                () -> new org.apache.kafka.streams.processor.Processor<String, Long>() {
+                () -> new Processor<String, Long, String, Long>() {
                     @Override
-                    public void init(final ProcessorContext context) {}
+                    public void init(final ProcessorContext<String, Long> context) {}
 
                     @Override
-                    public void process(final String key, final Long value) {}
+                    public void process(final Record<String, Long> record) {}
 
                     @Override
                     public void close() {}
@@ -159,7 +160,7 @@ public class GraphGraceSearchUtilTest {
 
     @Test
     public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
-        final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
+        final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
         final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
             "asdf",
             new ProcessorParameters<>(
@@ -191,7 +192,7 @@ public class GraphGraceSearchUtilTest {
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
-                    SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L)),
+                    SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)),
                     "asdf",
                     null,
                     null,
@@ -206,7 +207,7 @@ public class GraphGraceSearchUtilTest {
             "asdf",
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
-                    TimeWindows.of(ofMillis(10L)).grace(ofMillis(4321L)),
+                    TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
                     "asdf",
                     null,
                     null
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
index 99be7f8..987784b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
@@ -17,19 +17,21 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
 
 public class TableProcessorNodeTest {
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class TestProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+    private static class TestProcessor implements Processor<String, String, String, String> {
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<String, String> context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
+        public void process(final Record<String, String> record) {
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 87a4c68..dfa3f9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -27,7 +27,8 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -49,29 +50,28 @@ import static org.junit.Assert.assertTrue;
 
 public class ProcessorNodeTest {
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
-        final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
-        final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class ExceptionalProcessor implements org.apache.kafka.streams.processor.Processor<Object, Object> {
+    private static class ExceptionalProcessor implements Processor<Object, Object, Object, Object> {
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final ProcessorContext<Object, Object> context) {
             throw new RuntimeException();
         }
 
         @Override
-        public void process(final Object key, final Object value) {
+        public void process(final Record<Object, Object> record) {
             throw new RuntimeException();
         }
 
@@ -81,21 +81,9 @@ public class ProcessorNodeTest {
         }
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class NoOpProcessor implements org.apache.kafka.streams.processor.Processor<Object, Object> {
+    private static class NoOpProcessor implements Processor<Object, Object, Object, Object> {
         @Override
-        public void init(final ProcessorContext context) {
-
-        }
-
-        @Override
-        public void process(final Object key, final Object value) {
-
-        }
-
-        @Override
-        public void close() {
-
+        public void process(final Record<Object, Object> record) {
         }
     }
 
@@ -105,7 +93,8 @@ public class ProcessorNodeTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
-        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new NoOpProcessor(), Collections.emptySet());
         node.init(context);
 
         final String threadId = Thread.currentThread().getName();
@@ -140,9 +129,7 @@ public class ProcessorNodeTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         builder.<String, String>stream("streams-plaintext-input")
-            .flatMapValues(value -> {
-                return Collections.singletonList("");
-            });
+            .flatMapValues(value -> Collections.singletonList(""));
         final Topology topology = builder.build();
         final Properties config = new Properties();
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class);
@@ -164,9 +151,7 @@ public class ProcessorNodeTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         builder.<String, String>stream("streams-plaintext-input")
-            .flatMapValues(value -> {
-                return Collections.singletonList("");
-            });
+            .flatMapValues(value -> Collections.singletonList(""));
         final Topology topology = builder.build();
 
         final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology));
@@ -178,11 +163,11 @@ public class ProcessorNodeTest {
     private static class ClassCastProcessor extends ExceptionalProcessor {
 
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final ProcessorContext<Object, Object> context) {
         }
 
         @Override
-        public void process(final Object key, final Object value) {
+        public void process(final Record<Object, Object> record) {
             throw new ClassCastException("Incompatible types simulation exception.");
         }
     }
@@ -193,7 +178,8 @@ public class ProcessorNodeTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
-        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index cde573a..fe1bb3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.test.MockProcessorNode;
@@ -29,12 +28,7 @@ public class PunctuationQueueTest {
 
     private final MockProcessorNode<String, String, ?, ?> node = new MockProcessorNode<>();
     private final PunctuationQueue queue = new PunctuationQueue();
-    private final Punctuator punctuator = new Punctuator() {
-        @Override
-        public void punctuate(final long timestamp) {
-            node.mockProcessor.punctuatedStreamTime().add(timestamp);
-        }
-    };
+    private final Punctuator punctuator = timestamp -> node.mockProcessor.punctuatedStreamTime().add(timestamp);
 
     @Test
     public void testPunctuationInterval() {
@@ -43,12 +37,7 @@ public class PunctuationQueueTest {
 
         queue.schedule(sched);
 
-        final ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
-            @Override
-            public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
-                punctuator.punctuate(timestamp);
-            }
-        };
+        final ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> punctuator.punctuate(timestamp);
 
         queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
@@ -82,12 +71,8 @@ public class PunctuationQueueTest {
 
         queue.schedule(sched);
 
-        final ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
-            @Override
-            public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
-                punctuator.punctuate(timestamp);
-            }
-        };
+        final ProcessorNodePunctuator processorNodePunctuator =
+            (node, timestamp, type, punctuator) -> punctuator.punctuate(timestamp);
 
         queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
@@ -121,13 +106,10 @@ public class PunctuationQueueTest {
 
         final Cancellable cancellable = queue.schedule(sched);
 
-        final ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
-            @Override
-            public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
-                punctuator.punctuate(timestamp);
-                // simulate scheduler cancelled from within punctuator
-                cancellable.cancel();
-            }
+        final ProcessorNodePunctuator processorNodePunctuator = (node, timestamp, type, punctuator) -> {
+            punctuator.punctuate(timestamp);
+            // simulate scheduler cancelled from within punctuator
+            cancellable.cancel();
         };
 
         queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
@@ -139,18 +121,4 @@ public class PunctuationQueueTest {
         queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
     }
-
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class TestProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
-
-        @Override
-        public void init(final ProcessorContext context) {}
-
-        @Override
-        public void process(final String key, final String value) {}
-
-        @Override
-        public void close() {}
-    }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index eb817cc..60aeb7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -44,6 +44,8 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.After;
@@ -134,10 +136,7 @@ public class RepartitionOptimizingTest {
         runTest(StreamsConfig.NO_OPTIMIZATION, FOUR_REPARTITION_TOPICS);
     }
 
-
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
-
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> sourceStream =
@@ -258,8 +257,7 @@ public class RepartitionOptimizingTest {
         return keyValueList;
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private static class SimpleProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
+    private static class SimpleProcessor implements Processor<String, String, Void, Void> {
 
         final List<String> valueList;
 
@@ -268,8 +266,8 @@ public class RepartitionOptimizingTest {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            valueList.add(value);
+        public void process(final Record<String, String> record) {
+            valueList.add(record.value());
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b29961..bebd417 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -64,7 +64,11 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -1715,20 +1719,19 @@ public class StreamThreadTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     public void shouldPunctuateActiveTask() {
         final List<Long> punctuatedStreamTime = new ArrayList<>();
         final List<Long> punctuatedWallClockTime = new ArrayList<>();
-        final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor =
-            () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
+        final ProcessorSupplier<Object, Object, Void, Void> punctuateProcessor =
+            () -> new ContextualProcessor<Object, Object, Void, Void>() {
                 @Override
-                public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+                public void init(final ProcessorContext<Void, Void> context) {
                     context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
                     context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
                 }
 
                 @Override
-                public void process(final Object key, final Object value) {}
+                public void process(final Record<Object, Object> record) {}
             };
 
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
@@ -1786,7 +1789,6 @@ public class StreamThreadTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
         final org.apache.kafka.streams.kstream.TransformerSupplier<Object, Object, KeyValue<Object, Object>> punctuateProcessor =
             () -> new org.apache.kafka.streams.kstream.Transformer<Object, Object, KeyValue<Object, Object>>() {
@@ -1806,13 +1808,8 @@ public class StreamThreadTest {
             };
 
         final List<Long> peekedContextTime = new ArrayList<>();
-        final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> peekProcessor =
-            () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
-                @Override
-                public void process(final Object key, final Object value) {
-                    peekedContextTime.add(context.timestamp());
-                }
-            };
+        final ProcessorSupplier<Object, Object, Void, Void> peekProcessor =
+            () -> (Processor<Object, Object, Void, Void>) record -> peekedContextTime.add(record.timestamp());
 
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .transform(punctuateProcessor)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index b9b24da..3b1aa44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -99,7 +99,6 @@ public class EosTestClient extends SmokeTestUtil {
         }
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     private KafkaStreams createKafkaStreams(final Properties props) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 86f7583..936a41a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -50,7 +50,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
-@SuppressWarnings("deprecation")
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
@@ -163,7 +162,6 @@ public class SmokeTestClient extends SmokeTestUtil {
         return fullProps;
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
@@ -177,7 +175,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
 
         final KTable<Windowed<String>, Integer> minAggregation = groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1)))
             .aggregate(
                 () -> Integer.MAX_VALUE,
                 (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
@@ -197,7 +195,7 @@ public class SmokeTestClient extends SmokeTestUtil {
             .to("min", Produced.with(stringSerde, intSerde));
 
         final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
-            .windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
             .reduce((l, r) -> l + r);
 
         streamify(smallWindowSum, "sws-raw");
@@ -212,7 +210,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // max
         groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
             .aggregate(
                 () -> Integer.MIN_VALUE,
                 (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
@@ -229,7 +227,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // sum
         groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
             .aggregate(
                 () -> 0L,
                 (aggKey, value, aggregate) -> (long) value + aggregate,
@@ -244,7 +242,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // cnt
         groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
             .count(Materialized.as("uwin-cnt"))
             .toStream(new Unwindow<>())
             .filterNot((k, v) -> k.equals("flush"))
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 1222a81..e0c45d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -23,68 +23,67 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.time.Instant;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
-    static org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
+    static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, "");
     }
 
-    static org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final String name) {
-        return new org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object>() {
+    static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
+        return () -> new ContextualProcessor<Object, Object, Void, Void>() {
+            private int numRecordsProcessed = 0;
+            private long smallestOffset = Long.MAX_VALUE;
+            private long largestOffset = Long.MIN_VALUE;
+
             @Override
-            public org.apache.kafka.streams.processor.Processor<Object, Object> get() {
-                return new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
-                    private int numRecordsProcessed = 0;
-                    private long smallestOffset = Long.MAX_VALUE;
-                    private long largestOffset = Long.MIN_VALUE;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        super.init(context);
-                        System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
-                        System.out.flush();
-                        numRecordsProcessed = 0;
-                        smallestOffset = Long.MAX_VALUE;
-                        largestOffset = Long.MIN_VALUE;
-                    }
+            public void init(final ProcessorContext<Void, Void> context) {
+                super.init(context);
+                System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                System.out.flush();
+                numRecordsProcessed = 0;
+                smallestOffset = Long.MAX_VALUE;
+                largestOffset = Long.MIN_VALUE;
+            }
 
-                    @Override
-                    public void process(final Object key, final Object value) {
-                        numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.printf("%s: %s%n", name, Instant.now());
-                            System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
-                        }
-
-                        if (smallestOffset > context().offset()) {
-                            smallestOffset = context().offset();
-                        }
-                        if (largestOffset < context().offset()) {
-                            largestOffset = context().offset();
-                        }
+            @Override
+            public void process(final Record<Object, Object> record) {
+                numRecordsProcessed++;
+                if (numRecordsProcessed % 100 == 0) {
+                    System.out.printf("%s: %s%n", name, Instant.now());
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
+                }
+
+                if (context().recordMetadata().isPresent()) {
+                    if (smallestOffset > context().recordMetadata().get().offset()) {
+                        smallestOffset = context().recordMetadata().get().offset();
                     }
-
-                    @Override
-                    public void close() {
-                        System.out.printf("Close processor for task %s%n", context().taskId());
-                        System.out.println("processed " + numRecordsProcessed + " records");
-                        final long processed;
-                        if (largestOffset >= smallestOffset) {
-                            processed = 1L + largestOffset - smallestOffset;
-                        } else {
-                            processed = 0L;
-                        }
-                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
-                        System.out.flush();
+                    if (largestOffset < context().recordMetadata().get().offset()) {
+                        largestOffset = context().recordMetadata().get().offset();
                     }
-                };
+                }
+            }
+
+            @Override
+            public void close() {
+                System.out.printf("Close processor for task %s%n", context().taskId());
+                System.out.println("processed " + numRecordsProcessed + " records");
+                final long processed;
+                if (largestOffset >= smallestOffset) {
+                    processed = 1L + largestOffset - smallestOffset;
+                } else {
+                    processed = 0L;
+                }
+                System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                System.out.flush();
             }
         };
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index f20a532..94f623e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -89,7 +89,6 @@ public class StreamsUpgradeTest {
         });
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     public static KafkaStreams buildStreams(final Properties streamsProperties) {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<Void, Void> dataStream = builder.stream("data");