You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/04/09 19:50:56 UTC

[kafka] branch trunk updated: KAFKA-8410: Migrate KStream Stateless operators to new Processor API (#10381)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c9cab2b  KAFKA-8410: Migrate KStream Stateless operators to new Processor API (#10381)
c9cab2b is described below

commit c9cab2beb8159ca83e6e2c930514ba97282e1ae9
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Fri Apr 9 20:49:54 2021 +0100

    KAFKA-8410: Migrate KStream Stateless operators to new Processor API (#10381)
    
    Migrate KStream stateless operators to new Processor API.
    Following PRs will complete migration of KStream stateful operators and KTable.
    No expected functionality changes.
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../PrintedInternal.java => ForeachProcessor.java} | 22 +++++-----
 .../streams/kstream/internals/KStreamBranch.java   | 28 ++++++-------
 .../streams/kstream/internals/KStreamFilter.java   | 19 ++++-----
 .../streams/kstream/internals/KStreamFlatMap.java  | 25 +++++++-----
 .../kstream/internals/KStreamFlatMapValues.java    | 25 ++++++------
 .../streams/kstream/internals/KStreamImpl.java     |  5 ++-
 .../streams/kstream/internals/KStreamMap.java      | 25 +++++++-----
 .../kstream/internals/KStreamMapValues.java        | 23 ++++++-----
 .../streams/kstream/internals/KStreamPeek.java     | 25 +++++-------
 .../streams/kstream/internals/KStreamPrint.java    | 18 ++++-----
 .../streams/kstream/internals/KTableImpl.java      |  2 +-
 .../streams/kstream/internals/PassThrough.java     | 17 ++++----
 .../streams/kstream/internals/PrintedInternal.java |  4 +-
 .../streams/processor/api/ContextualProcessor.java | 47 ++++++++++++++++++++++
 .../apache/kafka/streams/kstream/PrintedTest.java  | 34 +++++++---------
 .../kstream/internals/KStreamBranchTest.java       |  4 +-
 .../kstream/internals/KStreamPrintTest.java        | 13 +++---
 17 files changed, 194 insertions(+), 142 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java
similarity index 59%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java
index 546e353..cccd298 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachProcessor.java
@@ -14,21 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.streams.kstream.Printed;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 
-public class PrintedInternal<K, V> extends Printed<K, V> {
-    public PrintedInternal(final Printed<K, V> printed) {
-        super(printed);
-    }
+public class ForeachProcessor<K, V> implements Processor<K, V, Void, Void> {
+
+    private final ForeachAction<K, V> action;
 
-    public ProcessorSupplier<K, V> build(final String processorName) {
-        return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
+    public ForeachProcessor(final ForeachAction<K, V> action) {
+        this.action = action;
     }
 
-    public String name() {
-        return processorName;
+    @Override
+    public void process(final Record<K, V> record) {
+        action.apply(record.key(), record.value());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 21b69f2..2d3fc76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -16,44 +16,44 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.To;
-
 import java.util.List;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
+class KStreamBranch<K, V> implements ProcessorSupplier<K, V, K, V> {
 
     private final List<Predicate<? super K, ? super V>> predicates;
     private final List<String> childNodes;
 
     KStreamBranch(final List<Predicate<? super K, ? super V>> predicates,
-                  final List<String> childNodes) {
+        final List<String> childNodes) {
         this.predicates = predicates;
         this.childNodes = childNodes;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamBranchProcessor();
     }
 
-    private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
+    private class KStreamBranchProcessor extends ContextualProcessor<K, V, K, V> {
+
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<K, V> record) {
             for (int i = 0; i < predicates.size(); i++) {
-                if (predicates.get(i).test(key, value)) {
+                if (predicates.get(i).test(record.key(), record.value())) {
                     // use forward with child here and then break the loop
                     // so that no record is going to be piped to multiple streams
-                    context().forward(key, value, To.child(childNodes.get(i)));
+                    context().forward(record, childNodes.get(i));
                     return;
                 }
             }
             // using default child node if supplied
             if (childNodes.size() > predicates.size()) {
-                context().forward(key, value, To.child(childNodes.get(predicates.size())));
+                context().forward(record, childNodes.get(predicates.size()));
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index ac03c18..ffafd10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -16,12 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
+class KStreamFilter<K, V> implements ProcessorSupplier<K, V, K, V> {
 
     private final Predicate<K, V> predicate;
     private final boolean filterNot;
@@ -32,15 +33,15 @@ class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamFilterProcessor();
     }
 
-    private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
+    private class KStreamFilterProcessor extends ContextualProcessor<K, V, K, V> {
         @Override
-        public void process(final K key, final V value) {
-            if (filterNot ^ predicate.test(key, value)) {
-                context().forward(key, value);
+        public void process(final Record<K, V> record) {
+            if (filterNot ^ predicate.test(record.key(), record.value())) {
+                context().forward(record);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index e20ec90..b731a5c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -18,28 +18,31 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
+class KStreamFlatMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
 
-    private final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper;
+    private final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper;
 
-    KStreamFlatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    KStreamFlatMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<KIn, VIn, KOut, VOut> get() {
         return new KStreamFlatMapProcessor();
     }
 
-    private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
+    private class KStreamFlatMapProcessor extends ContextualProcessor<KIn, VIn, KOut, VOut> {
         @Override
-        public void process(final K key, final V value) {
-            for (final KeyValue<? extends K1, ? extends V1> newPair : mapper.apply(key, value)) {
-                context().forward(newPair.key, newPair.value);
+        public void process(final Record<KIn, VIn> record) {
+            final Iterable<? extends KeyValue<? extends KOut, ? extends VOut>> newKeyValues =
+                mapper.apply(record.key(), record.value());
+            for (final KeyValue<? extends KOut, ? extends VOut> newPair : newKeyValues) {
+                context().forward(record.withKey(newPair.key).withValue(newPair.value));
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index fedfe393..1008b29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -17,29 +17,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
+class KStreamFlatMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
 
-    private final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper;
+    private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends Iterable<? extends VOut>> mapper;
 
-    KStreamFlatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends V1>> mapper) {
+    KStreamFlatMapValues(final ValueMapperWithKey<? super KIn, ? super VIn, ? extends Iterable<? extends VOut>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<KIn, VIn, KIn, VOut> get() {
         return new KStreamFlatMapValuesProcessor();
     }
 
-    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
+    private class KStreamFlatMapValuesProcessor extends ContextualProcessor<KIn, VIn, KIn, VOut> {
         @Override
-        public void process(final K key, final V value) {
-            final Iterable<? extends V1> newValues = mapper.apply(key, value);
-            for (final V1 v : newValues) {
-                context().forward(key, v);
+        public void process(final Record<KIn, VIn> record) {
+            final Iterable<? extends VOut> newValues = mapper.apply(record.key(), record.value());
+            for (final VOut v : newValues) {
+                context().forward(record.withValue(v));
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bacddc5..c013bd6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -61,6 +61,7 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.kstream.ForeachProcessor;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -406,7 +407,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
         final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME);
         final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
-            new ProcessorParameters<>(new KStreamPeek<>(action, false), name);
+            new ProcessorParameters<>(() -> new ForeachProcessor<>(action), name);
         final ProcessorGraphNode<? super K, ? super V> foreachNode =
             new ProcessorGraphNode<>(name, processorParameters);
 
@@ -426,7 +427,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
         final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME);
         final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
-            new ProcessorParameters<>(new KStreamPeek<>(action, true), name);
+            new ProcessorParameters<>(new KStreamPeek<>(action), name);
         final ProcessorGraphNode<? super K, ? super V> peekNode =
             new ProcessorGraphNode<>(name, processorParameters);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 8179ca8..0fec716 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -16,30 +16,33 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
+class KStreamMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
 
-    private final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper;
+    private final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper;
 
-    public KStreamMap(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
+    public KStreamMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<KIn, VIn, KOut, VOut> get() {
         return new KStreamMapProcessor();
     }
 
-    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
+    private class KStreamMapProcessor extends ContextualProcessor<KIn, VIn, KOut, VOut> {
+
         @Override
-        public void process(final K key, final V value) {
-            final KeyValue<? extends K1, ? extends V1> newPair = mapper.apply(key, value);
-            context().forward(newPair.key, newPair.value);
+        public void process(final Record<KIn, VIn> record) {
+            final KeyValue<? extends KOut, ? extends VOut> newPair =
+                mapper.apply(record.key(), record.value());
+            context().forward(record.withKey(newPair.key).withValue(newPair.value));
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 28c120e..f73bfdd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -17,28 +17,29 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
+class KStreamMapValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
 
-    private final ValueMapperWithKey<K, V, V1> mapper;
+    private final ValueMapperWithKey<KIn, VIn, VOut> mapper;
 
-    public KStreamMapValues(final ValueMapperWithKey<K, V, V1> mapper) {
+    public KStreamMapValues(final ValueMapperWithKey<KIn, VIn, VOut> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<KIn, VIn, KIn, VOut> get() {
         return new KStreamMapProcessor();
     }
 
-    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
+    private class KStreamMapProcessor extends ContextualProcessor<KIn, VIn, KIn, VOut> {
         @Override
-        public void process(final K readOnlyKey, final V value) {
-            final V1 newValue = mapper.apply(readOnlyKey, value);
-            context().forward(readOnlyKey, newValue);
+        public void process(final Record<KIn, VIn> record) {
+            final VOut newValue = mapper.apply(record.key(), record.value());
+            context().forward(record.withValue(newValue));
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
index 44d1d60..69b5e7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
@@ -17,32 +17,29 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
+class KStreamPeek<K, V> implements ProcessorSupplier<K, V, K, V> {
 
-    private final boolean forwardDownStream;
     private final ForeachAction<K, V> action;
 
-    public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownStream) {
+    public KStreamPeek(final ForeachAction<K, V> action) {
         this.action = action;
-        this.forwardDownStream = forwardDownStream;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamPeekProcessor();
     }
 
-    private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+    private class KStreamPeekProcessor extends ContextualProcessor<K, V, K, V> {
         @Override
-        public void process(final K key, final V value) {
-            action.apply(key, value);
-            if (forwardDownStream) {
-                context().forward(key, value);
-            }
+        public void process(final Record<K, V> record) {
+            action.apply(record.key(), record.value());
+            context().forward(record);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
index 8ce698a..a04662c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
@@ -17,28 +17,28 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
+public class KStreamPrint<K, V> implements ProcessorSupplier<K, V, Void, Void> {
 
     private final ForeachAction<K, V> action;
-    
+
     public KStreamPrint(final ForeachAction<K, V> action) {
         this.action = action;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, Void, Void> get() {
         return new KStreamPrintProcessor();
     }
 
-    private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
+    private class KStreamPrintProcessor implements Processor<K, V, Void, Void> {
 
         @Override
-        public void process(final K key, final V value) {
-            action.apply(key, value);
+        public void process(final Record<K, V> record) {
+            action.apply(record.key(), record.value());
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 98aee13..52f7b5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -484,7 +484,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         Objects.requireNonNull(named, "named can't be null");
 
         final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME);
-        final ProcessorSupplier<K, Change<V>> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
+        final KStreamMapValues<K, Change<V>, V> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
         final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
             new ProcessorParameters<>(kStreamMapValues, name)
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
index b83b3a4..f357a46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
@@ -16,21 +16,22 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class PassThrough<K, V> implements ProcessorSupplier<K, V> {
+class PassThrough<K, V> implements ProcessorSupplier<K, V, K, V> {
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new PassThroughProcessor<>();
     }
 
-    private static final class PassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
+    private static final class PassThroughProcessor<K, V> extends ContextualProcessor<K, V, K, V> {
         @Override
-        public void process(final K key, final V value) {
-            context().forward(key, value);
+        public void process(final Record<K, V> record) {
+            context().forward(record);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index 546e353..0cd1760 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -17,14 +17,14 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Printed;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 
 public class PrintedInternal<K, V> extends Printed<K, V> {
     public PrintedInternal(final Printed<K, V> printed) {
         super(printed);
     }
 
-    public ProcessorSupplier<K, V> build(final String processorName) {
+    public ProcessorSupplier<K, V, Void, Void> build(final String processorName) {
         return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
new file mode 100644
index 0000000..d2522e3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ContextualProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
+ * implementation of {@link #close()}.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public abstract class ContextualProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
+
+    protected ProcessorContext<KOut, VOut> context;
+
+    protected ContextualProcessor() {}
+
+    @Override
+    public void init(final ProcessorContext<KOut, VOut> context) {
+        this.context = context;
+    }
+
+    /**
+     * Get the processor's context set during {@link #init(ProcessorContext) initialization}.
+     *
+     * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
+     */
+    protected final ProcessorContext<KOut, VOut> context() {
+        return context;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
index a56dbba..212074f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -19,8 +19,9 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.internals.PrintedInternal;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -59,11 +60,11 @@ public class PrintedTest {
     @Test
     public void shouldCreateProcessorThatPrintsToFile() throws IOException {
         final File file = TestUtils.tempFile();
-        final ProcessorSupplier<String, Integer> processorSupplier = new PrintedInternal<>(
+        final ProcessorSupplier<String, Integer, Void, Void> processorSupplier = new PrintedInternal<>(
                 Printed.<String, Integer>toFile(file.getPath()))
                 .build("processor");
-        final Processor<String, Integer> processor = processorSupplier.get();
-        processor.process("hi", 1);
+        final Processor<String, Integer, Void, Void> processor = processorSupplier.get();
+        processor.process(new Record<>("hi", 1, 0L));
         processor.close();
         try (final InputStream stream = Files.newInputStream(file.toPath())) {
             final byte[] data = new byte[stream.available()];
@@ -74,36 +75,31 @@ public class PrintedTest {
 
     @Test
     public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException {
-        final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
-        final Processor<String, Integer> processor = supplier.get();
+        final ProcessorSupplier<String, Integer, Void, Void> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
+        final Processor<String, Integer, Void, Void> processor = supplier.get();
 
-        processor.process("good", 2);
+        processor.process(new Record<>("good", 2, 0L));
         processor.close();
         assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n"));
     }
 
     @Test
     public void shouldPrintWithLabel() throws UnsupportedEncodingException {
-        final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withLabel("label"))
+        final Processor<String, Integer, Void, Void> processor = new PrintedInternal<>(sysOutPrinter.withLabel("label"))
                 .build("processor")
                 .get();
 
-        processor.process("hello", 3);
+        processor.process(new Record<>("hello", 3, 0L));
         processor.close();
         assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n"));
     }
 
     @Test
     public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException {
-        final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withKeyValueMapper(
-                new KeyValueMapper<String, Integer, String>() {
-                    @Override
-                    public String apply(final String key, final Integer value) {
-                        return String.format("%s -> %d", key, value);
-                    }
-                })).build("processor")
-                .get();
-        processor.process("hello", 1);
+        final Processor<String, Integer, Void, Void> processor = new PrintedInternal<>(
+            sysOutPrinter.withKeyValueMapper((key, value) -> String.format("%s -> %d", key, value))
+        ).build("processor").get();
+        processor.process(new Record<>("hello", 1, 0L));
         processor.close();
         assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n"));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index e33186e..1813522 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -60,8 +60,8 @@ public class KStreamBranchTest {
         assertEquals(3, branches.length);
 
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
-        for (int i = 0; i < branches.length; i++) {
-            branches[i].process(supplier);
+        for (final KStream<Integer, String> branch : branches) {
+            branch.process(supplier);
         }
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 9906556..2915a11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -17,8 +17,9 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +34,7 @@ import static org.junit.Assert.assertEquals;
 public class KStreamPrintTest {
 
     private ByteArrayOutputStream byteOutStream;
-    private Processor<Integer, String> printProcessor;
+    private Processor<Integer, String, Void, Void> printProcessor;
 
     @Before
     public void setUp() {
@@ -45,14 +46,13 @@ public class KStreamPrintTest {
             "test-stream"));
 
         printProcessor = kStreamPrint.get();
-        final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
+        final ProcessorContext<Void, Void> processorContext = EasyMock.createNiceMock(ProcessorContext.class);
         EasyMock.replay(processorContext);
 
         printProcessor.init(processorContext);
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void testPrintStreamWithProvidedKeyValueMapper() {
         final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
                 new KeyValue<>(0, "zero"),
@@ -67,7 +67,8 @@ public class KStreamPrintTest {
             "[test-stream]: 3, three"};
 
         for (final KeyValue<Integer, String> record: inputRecords) {
-            printProcessor.process(record.key, record.value);
+            final Record<Integer, String> r = new Record<>(record.key, record.value, 0L);
+            printProcessor.process(r);
         }
         printProcessor.close();