You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/09/22 19:27:07 UTC

[kafka] branch trunk updated: KAFKA-10544: Migrate KTable aggregate and reduce (#11316)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d15969a  KAFKA-10544: Migrate KTable aggregate and reduce (#11316)
d15969a is described below

commit d15969af286f4f239c0a972f4135ef429381ccd6
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Wed Sep 22 20:25:41 2021 +0100

    KAFKA-10544: Migrate KTable aggregate and reduce (#11316)
    
    As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KTable aggregate/reduce operations.
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../kstream/internals/KGroupedTableImpl.java       | 45 +++++++--------
 .../streams/kstream/internals/KTableAggregate.java | 65 +++++++++++-----------
 .../streams/kstream/internals/KTableReduce.java    | 39 ++++++-------
 .../kstream/internals/KGroupedTableImplTest.java   | 13 ++---
 .../kstream/internals/KTableAggregateTest.java     | 13 ++---
 .../kstream/internals/KTableReduceTest.java        | 14 ++---
 6 files changed, 92 insertions(+), 97 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 111d169..da35dac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRep
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
@@ -67,11 +68,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         this.userProvidedRepartitionTopicName = groupedInternal.name();
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private <T> KTable<K, T> doAggregate(final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier,
+    private <VAgg> KTable<K, VAgg> doAggregate(final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier,
                                          final NamedInternal named,
                                          final String functionName,
-                                         final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
+                                         final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {
 
         final String sinkName = named.suffixWithOrElseGet("-sink", builder, KStreamImpl.SINK_NAME);
         final String sourceName = named.suffixWithOrElseGet("-source", builder, KStreamImpl.SOURCE_NAME);
@@ -145,8 +145,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         if (materializedInternal.valueSerde() == null) {
             materializedInternal.withValueSerde(valueSerde);
         }
-        @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-        final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
+        final ProcessorSupplier<K, Change<V>, K, Change<V>> aggregateSupplier = new KTableReduce<>(
             materializedInternal.storeName(),
             adder,
             subtractor);
@@ -177,8 +176,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
             materializedInternal.withValueSerde(Serdes.Long());
         }
 
-        @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-        final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
+        final ProcessorSupplier<K, Change<V>, K, Change<Long>> aggregateSupplier = new KTableAggregate<>(
             materializedInternal.storeName(),
             countInitializer,
             countAdder,
@@ -198,33 +196,32 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
     }
 
     @Override
-    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
-                                        final Aggregator<? super K, ? super V, VR> adder,
-                                        final Aggregator<? super K, ? super V, VR> subtractor,
-                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+    public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+                                        final Aggregator<? super K, ? super V, VAgg> adder,
+                                        final Aggregator<? super K, ? super V, VAgg> subtractor,
+                                        final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {
         return aggregate(initializer, adder, subtractor, NamedInternal.empty(), materialized);
     }
 
     @Override
-    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
-                                        final Aggregator<? super K, ? super V, VR> adder,
-                                        final Aggregator<? super K, ? super V, VR> subtractor,
+    public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+                                        final Aggregator<? super K, ? super V, VAgg> adder,
+                                        final Aggregator<? super K, ? super V, VAgg> subtractor,
                                         final Named named,
-                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+                                        final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(named, "named can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+        final MaterializedInternal<K, VAgg, KeyValueStore<Bytes, byte[]>> materializedInternal =
             new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
 
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
-        @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-        final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
+        final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier = new KTableAggregate<>(
             materializedInternal.storeName(),
             initializer,
             adder,
@@ -233,17 +230,17 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
     }
 
     @Override
-    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
-                                      final Aggregator<? super K, ? super V, T> adder,
-                                      final Aggregator<? super K, ? super V, T> subtractor,
+    public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+                                      final Aggregator<? super K, ? super V, VAgg> adder,
+                                      final Aggregator<? super K, ? super V, VAgg> subtractor,
                                       final Named named) {
         return aggregate(initializer, adder, subtractor, named, Materialized.with(keySerde, null));
     }
 
     @Override
-    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
-                                      final Aggregator<? super K, ? super V, T> adder,
-                                      final Aggregator<? super K, ? super V, T> subtractor) {
+    public <VAgg> KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
+                                      final Aggregator<? super K, ? super V, VAgg> adder,
+                                      final Aggregator<? super K, ? super V, VAgg> subtractor) {
         return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null));
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 3ff4dfa..410a49b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -19,25 +19,27 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
+public class KTableAggregate<KIn, VIn, VAgg> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VAgg> {
 
     private final String storeName;
-    private final Initializer<T> initializer;
-    private final Aggregator<? super K, ? super V, T> add;
-    private final Aggregator<? super K, ? super V, T> remove;
+    private final Initializer<VAgg> initializer;
+    private final Aggregator<? super KIn, ? super VIn, VAgg> add;
+    private final Aggregator<? super KIn, ? super VIn, VAgg> remove;
 
     private boolean sendOldValues = false;
 
     KTableAggregate(final String storeName,
-                    final Initializer<T> initializer,
-                    final Aggregator<? super K, ? super V, T> add,
-                    final Aggregator<? super K, ? super V, T> remove) {
+                    final Initializer<VAgg> initializer,
+                    final Aggregator<? super KIn, ? super VIn, VAgg> add,
+                    final Aggregator<? super KIn, ? super VIn, VAgg> remove) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.add = add;
@@ -52,19 +54,18 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
+    public Processor<KIn, Change<VIn>, KIn, Change<VAgg>> get() {
         return new KTableAggregateProcessor();
     }
 
-    private class KTableAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
-        private TimestampedKeyValueStore<K, T> store;
-        private TimestampedTupleForwarder<K, T> tupleForwarder;
+    private class KTableAggregateProcessor implements Processor<KIn, Change<VIn>, KIn, Change<VAgg>> {
+        private TimestampedKeyValueStore<KIn, VAgg> store;
+        private TimestampedTupleForwarder<KIn, VAgg> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
-            super.init(context);
-            store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
+        public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
+            store = (TimestampedKeyValueStore<KIn, VAgg>) context.getStateStore(storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store,
                 context,
@@ -76,52 +77,52 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
          * @throws StreamsException if key is null
          */
         @Override
-        public void process(final K key, final Change<V> value) {
+        public void process(final Record<KIn, Change<VIn>> record) {
             // the keys should never be null
-            if (key == null) {
+            if (record.key() == null) {
                 throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
             }
 
-            final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
-            final T oldAgg = getValueOrNull(oldAggAndTimestamp);
-            final T intermediateAgg;
-            long newTimestamp = context().timestamp();
+            final ValueAndTimestamp<VAgg> oldAggAndTimestamp = store.get(record.key());
+            final VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
+            final VAgg intermediateAgg;
+            long newTimestamp = record.timestamp();
 
             // first try to remove the old value
-            if (value.oldValue != null && oldAgg != null) {
-                intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
-                newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+            if (record.value().oldValue != null && oldAgg != null) {
+                intermediateAgg = remove.apply(record.key(), record.value().oldValue, oldAgg);
+                newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
             } else {
                 intermediateAgg = oldAgg;
             }
 
             // then try to add the new value
-            final T newAgg;
-            if (value.newValue != null) {
-                final T initializedAgg;
+            final VAgg newAgg;
+            if (record.value().newValue != null) {
+                final VAgg initializedAgg;
                 if (intermediateAgg == null) {
                     initializedAgg = initializer.apply();
                 } else {
                     initializedAgg = intermediateAgg;
                 }
 
-                newAgg = add.apply(key, value.newValue, initializedAgg);
+                newAgg = add.apply(record.key(), record.value().newValue, initializedAgg);
                 if (oldAggAndTimestamp != null) {
-                    newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+                    newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
                 }
             } else {
                 newAgg = intermediateAgg;
             }
 
             // update the store with the new value
-            store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
-            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
+            store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
+            tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? oldAgg : null, newTimestamp);
         }
 
     }
 
     @Override
-    public KTableValueGetterSupplier<K, T> view() {
+    public KTableValueGetterSupplier<KIn, VAgg> view() {
         return new KTableMaterializedValueGetterSupplier<>(storeName);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 72f75ea..d43f6df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -18,13 +18,15 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
+public class KTableReduce<K, V> implements KTableNewProcessorSupplier<K, V, K, V> {
 
     private final String storeName;
     private final Reducer<V> addReducer;
@@ -46,19 +48,18 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
+    public Processor<K, Change<V>, K, Change<V>> get() {
         return new KTableReduceProcessor();
     }
 
-    private class KTableReduceProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
+    private class KTableReduceProcessor implements Processor<K, Change<V>, K, Change<V>> {
 
         private TimestampedKeyValueStore<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
-            super.init(context);
+        public void init(final ProcessorContext<K, Change<V>> context) {
             store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store,
@@ -71,42 +72,42 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
          * @throws StreamsException if key is null
          */
         @Override
-        public void process(final K key, final Change<V> value) {
+        public void process(final Record<K, Change<V>> record) {
             // the keys should never be null
-            if (key == null) {
+            if (record.key() == null) {
                 throw new StreamsException("Record key for KTable reduce operator with state " + storeName + " should not be null.");
             }
 
-            final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
+            final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(record.key());
             final V oldAgg = getValueOrNull(oldAggAndTimestamp);
             final V intermediateAgg;
             long newTimestamp;
 
             // first try to remove the old value
-            if (value.oldValue != null && oldAgg != null) {
-                intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
-                newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+            if (record.value().oldValue != null && oldAgg != null) {
+                intermediateAgg = removeReducer.apply(oldAgg, record.value().oldValue);
+                newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
             } else {
                 intermediateAgg = oldAgg;
-                newTimestamp = context().timestamp();
+                newTimestamp = record.timestamp();
             }
 
             // then try to add the new value
             final V newAgg;
-            if (value.newValue != null) {
+            if (record.value().newValue != null) {
                 if (intermediateAgg == null) {
-                    newAgg = value.newValue;
+                    newAgg = record.value().newValue;
                 } else {
-                    newAgg = addReducer.apply(intermediateAgg, value.newValue);
-                    newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+                    newAgg = addReducer.apply(intermediateAgg, record.value().newValue);
+                    newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
                 }
             } else {
                 newAgg = intermediateAgg;
             }
 
             // update the store with the new value
-            store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
-            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
+            store.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp));
+            tupleForwarder.maybeForward(record.key(), newAgg, sendOldValues ? oldAgg : null, newTimestamp);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 39cd241..3b5295c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -34,9 +34,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -126,9 +126,8 @@ public class KGroupedTableImplTest {
             Materialized.as(INVALID_STORE_NAME)));
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    private MockProcessorSupplier<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
-        final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
+    private MockApiProcessorSupplier<String, Integer, Void, Void> getReducedResults(final KTable<String, Integer> inputKTable) {
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         inputKTable
             .toStream()
             .process(supplier);
@@ -173,7 +172,7 @@ public class KGroupedTableImplTest {
                 MockReducer.INTEGER_SUBTRACTOR,
                 Materialized.as("reduced"));
 
-        final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
             assertEquals(reduced.queryableStoreName(), "reduced");
@@ -195,7 +194,7 @@ public class KGroupedTableImplTest {
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
 
-        final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
             assertNull(reduced.queryableStoreName());
@@ -219,7 +218,7 @@ public class KGroupedTableImplTest {
                     .withKeySerde(Serdes.String())
                     .withValueSerde(Serdes.Integer()));
 
-        final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
+        final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = getReducedResults(reduced);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
             {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index a61ed12..220734f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -32,10 +32,10 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessor;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 import java.util.Properties;
@@ -49,12 +49,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.junit.Assert.assertEquals;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KTableAggregateTest {
     private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Grouped<String, String> stringSerialized = Grouped.with(stringSerde, stringSerde);
-    private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+    private final MockApiProcessorSupplier<String, Object, Void, Void> supplier = new MockApiProcessorSupplier<>();
     private final static Properties CONFIG = mkProperties(mkMap(
         mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())));
 
@@ -169,7 +168,7 @@ public class KTableAggregateTest {
 
     private static void testCountHelper(final StreamsBuilder builder,
                                         final String input,
-                                        final MockProcessorSupplier<String, Object> supplier) {
+                                        final MockApiProcessorSupplier<String, Object, Void, Void> supplier) {
         try (
             final TopologyTestDriver driver = new TopologyTestDriver(
                 builder.build(), CONFIG, Instant.ofEpochMilli(0L))) {
@@ -229,7 +228,7 @@ public class KTableAggregateTest {
     public void testRemoveOldBeforeAddNew() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
-        final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
+        final MockApiProcessorSupplier<String, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
 
         builder
             .table(input, consumed)
@@ -253,7 +252,7 @@ public class KTableAggregateTest {
             final TestInputTopic<String, String> inputTopic =
                 driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
 
-            final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
+            final MockApiProcessor<String, String, Void, Void> proc = supplier.theCapturedProcessor();
 
             inputTopic.pipeInput("11", "A", 10L);
             inputTopic.pipeInput("12", "B", 8L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index 0f5cc6b..89aa17a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -31,14 +33,13 @@ import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KTableReduceTest {
 
     @Test
     public void shouldAddAndSubtract() {
         final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>();
 
-        final org.apache.kafka.streams.processor.Processor<String, Change<Set<String>>> reduceProcessor =
+        final Processor<String, Change<Set<String>>, String, Change<Set<String>>> reduceProcessor =
             new KTableReduce<String, Set<String>>(
                 "myStore",
                 this::unionNotNullArgs,
@@ -52,14 +53,11 @@ public class KTableReduceTest {
         reduceProcessor.init(context);
         context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore")));
 
-        context.setTime(10L);
-        reduceProcessor.process("A", new Change<>(singleton("a"), null));
+        reduceProcessor.process(new Record<>("A", new Change<>(singleton("a"), null), 10L));
         assertEquals(ValueAndTimestamp.make(singleton("a"), 10L), myStore.get("A"));
-        context.setTime(15L);
-        reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a")));
+        reduceProcessor.process(new Record<>("A", new Change<>(singleton("b"), singleton("a")), 15L));
         assertEquals(ValueAndTimestamp.make(singleton("b"), 15L), myStore.get("A"));
-        context.setTime(12L);
-        reduceProcessor.process("A", new Change<>(null, singleton("b")));
+        reduceProcessor.process(new Record<>("A", new Change<>(null, singleton("b")), 12L));
         assertEquals(ValueAndTimestamp.make(emptySet(), 15L), myStore.get("A"));
     }