You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/08/10 02:09:36 UTC

[kafka] branch trunk updated: KAFKA-10261: Introduce the KIP-478 apis with adapters (#9004)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92828d5  KAFKA-10261: Introduce the KIP-478 apis with adapters (#9004)
92828d5 is described below

commit 92828d53b18703000159f4dd7dc8b3170667db25
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sun Aug 9 21:05:25 2020 -0500

    KAFKA-10261: Introduce the KIP-478 apis with adapters (#9004)
    
    Adds the new Processor and ProcessorContext interfaces
    as proposed in KIP-478. To integrate in a staged fashion
    with the code base, adapters are included to convert back
    and forth between the new and old APIs.
    
    ProcessorNode is converted to the new APIs.
    
    Reviewers: Boyang Chen <bo...@confluent.io>
---
 .../kafka/streams/processor/api/Processor.java     |  64 ++++++
 .../streams/processor/api/ProcessorContext.java    | 239 ++++++++++++++++++++
 .../streams/processor/api/ProcessorSupplier.java   |  43 ++++
 .../internals/AbstractProcessorContext.java        |   6 +-
 .../internals/GlobalProcessorContextImpl.java      |   8 +-
 .../processor/internals/GlobalStateUpdateTask.java |   6 +-
 ...ntext.java => InternalApiProcessorContext.java} |  14 +-
 .../internals/InternalProcessorContext.java        |   6 +-
 .../internals/InternalTopologyBuilder.java         | 171 +++++++-------
 .../processor/internals/PartitionGroup.java        |   2 +-
 .../processor/internals/ProcessorAdapter.java      |  53 +++++
 .../internals/ProcessorContextAdapter.java         | 234 +++++++++++++++++++
 .../processor/internals/ProcessorContextImpl.java  |  14 +-
 .../internals/ProcessorContextReverseAdapter.java  | 248 +++++++++++++++++++++
 .../streams/processor/internals/ProcessorNode.java |  27 ++-
 .../internals/ProcessorNodePunctuator.java         |   2 +-
 .../processor/internals/ProcessorTopology.java     |  44 ++--
 .../processor/internals/RecordDeserializer.java    |   6 +-
 .../streams/processor/internals/RecordQueue.java   |   6 +-
 .../streams/processor/internals/SinkNode.java      |  26 +--
 .../streams/processor/internals/SourceNode.java    |  24 +-
 .../streams/processor/internals/StreamTask.java    |  14 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |   2 +-
 .../kafka/streams/TopologyTestDriverWrapper.java   |   6 +-
 .../streams/kstream/internals/KStreamImplTest.java |   8 +-
 .../internals/GlobalProcessorContextImplTest.java  |  19 +-
 .../processor/internals/GlobalStateTaskTest.java   |  10 +-
 .../internals/InternalTopologyBuilderTest.java     |   4 +-
 .../internals/ProcessorContextImplTest.java        |   2 +-
 .../processor/internals/ProcessorNodeTest.java     |   4 +-
 .../internals/ProcessorTopologyFactories.java      |   4 +-
 .../processor/internals/PunctuationQueueTest.java  |   2 +-
 .../internals/RecordDeserializerTest.java          |   2 +-
 .../processor/internals/RecordQueueTest.java       |   2 +-
 .../streams/processor/internals/SinkNodeTest.java  |   4 +-
 .../processor/internals/SourceNodeTest.java        |   6 +-
 .../processor/internals/StreamTaskTest.java        |  18 +-
 .../kafka/test/InternalMockProcessorContext.java   |   6 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |  12 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |  10 +-
 40 files changed, 1130 insertions(+), 248 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
new file mode 100644
index 0000000..d3656c7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.time.Duration;
+
+/**
+ * A processor of key-value pair records.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public interface Processor<KIn, VIn, KOut, VOut> {
+
+    /**
+     * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
+     * that contains it is initialized. When the framework is done with the processor, {@link #close()} will be called on it; the
+     * framework may later re-use the processor by calling {@code #init()} again.
+     * <p>
+     * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
+     * {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} a method to be
+     * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
+     *
+     * @param context the context; may not be null
+     */
+    default void init(final ProcessorContext<KOut, VOut> context) {}
+
+    /**
+     * Process the record with the given key and value.
+     *
+     * @param key the key for the record
+     * @param value the value for the record
+     */
+    void process(KIn key, VIn value);
+
+    /**
+     * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
+     * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. The framework may
+     * later re-use this processor by calling {@code #init()} on it again.
+     * <p>
+     * Note: Do not close any streams managed resources, like {@link StateStore}s here, as they are managed by the library.
+     */
+    default void close() {}
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
new file mode 100644
index 0000000..4b98e6a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ *
+ * @param <KForward> a bound on the types of keys that may be forwarded
+ * @param <VForward> a bound on the types of values that may be forwarded
+ */
+public interface ProcessorContext<KForward, VForward> {
+
+    /**
+     * Returns the application id.
+     *
+     * @return the application id
+     */
+    String applicationId();
+
+    /**
+     * Returns the task id.
+     *
+     * @return the task id
+     */
+    TaskId taskId();
+
+    /**
+     * Returns the default key serde.
+     *
+     * @return the key serializer
+     */
+    Serde<?> keySerde();
+
+    /**
+     * Returns the default value serde.
+     *
+     * @return the value serializer
+     */
+    Serde<?> valueSerde();
+
+    /**
+     * Returns the state directory for the partition.
+     *
+     * @return the state directory
+     */
+    File stateDir();
+
+    /**
+     * Returns Metrics instance.
+     *
+     * @return StreamsMetrics
+     */
+    StreamsMetrics metrics();
+
+    /**
+     * Registers and possibly restores the specified storage engine.
+     *
+     * @param store the storage engine
+     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(final StateStore store,
+                  final StateRestoreCallback stateRestoreCallback);
+
+    /**
+     * Get the state store given the store name.
+     *
+     * @param name The store name
+     * @return The state store instance
+     */
+    StateStore getStateStore(final String name);
+
+    /**
+     * Schedules a periodic operation for processors. A processor may call this method during
+     * {@link Processor#init(org.apache.kafka.streams.processor.ProcessorContext) initialization} or
+     * {@link Processor#process(Object, Object) processing} to
+     * schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is processed.
+     *   <b>NOTE:</b> Only advanced if messages arrive</li>
+     *   <li>{@link PunctuationType#WALL_CLOCK_TIME} &mdash; uses system time (the wall-clock time),
+     *   which is advanced independent of whether new messages arrive.
+     *   The first punctuation will be triggered after interval has elapsed.
+     *   <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the
+     *   processing loop takes to complete</li>
+     * </ul>
+     *
+     * <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp.
+     * This means that "missed" punctuation will be skipped.
+     * It's possible to "miss" a punctuation if:
+     * <ul>
+     *   <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li>
+     *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
+     * </ul>
+     *
+     * @param interval the time interval between punctuations (supported minimum is 1 millisecond)
+     * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
+     * @param callback a function consuming timestamps representing the current stream or system time
+     * @return a handle allowing cancellation of the punctuation schedule established by this method
+     */
+    Cancellable schedule(final Duration interval,
+                         final PunctuationType type,
+                         final Punctuator callback);
+
+    /**
+     * Forwards a key/value pair to all downstream processors.
+     * Used the input record's timestamp as timestamp for the output record.
+     *
+     * @param key key
+     * @param value value
+     */
+    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+
+    /**
+     * Forwards a key/value pair to the specified downstream processors.
+     * Can be used to set the timestamp of the output record.
+     *
+     * @param key key
+     * @param value value
+     * @param to the options to use when forwarding
+     */
+    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+
+    /**
+     * Requests a commit.
+     */
+    void commit();
+
+    /**
+     * Returns the topic name of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
+     * @return the topic name
+     */
+    String topic();
+
+    /**
+     * Returns the partition id of the current input record; could be -1 if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
+     * @return the partition id
+     */
+    int partition();
+
+    /**
+     * Returns the offset of the current input record; could be -1 if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
+     * @return the offset
+     */
+    long offset();
+
+    /**
+     * Returns the headers of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call).
+     *
+     * @return the headers
+     */
+    Headers headers();
+
+    /**
+     * Returns the current timestamp.
+     *
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
+     *
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
+     * if this method is invoked from the punctuate call), timestamp is defined as the current
+     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
+     *
+     * @return the timestamp
+     */
+    long timestamp();
+
+    /**
+     * Returns all the application config properties as key/value pairs.
+     *
+     * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the ProcessorContext.
+     *
+     * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
+     * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
+     * will be of type {@link Class}, even if it was specified as a String to
+     * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
+     *
+     * @return all the key/values from the StreamsConfig properties
+     */
+    Map<String, Object> appConfigs();
+
+    /**
+     * Returns all the application config properties with the given key prefix, as key/value pairs
+     * stripping the prefix.
+     *
+     * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+     * object and associated to the ProcessorContext.
+     *
+     * @param prefix the properties prefix
+     * @return the key/values matching the given prefix from the StreamsConfig properties.
+     */
+    Map<String, Object> appConfigsWithPrefix(final String prefix);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorSupplier.java
new file mode 100644
index 0000000..25c82c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+
+/**
+ * A processor supplier that can create one or more {@link Processor} instances.
+ *
+ * It is used in {@link Topology} for adding new processor operators, whose generated
+ * topology can then be replicated (and thus creating one or more {@link Processor} instances)
+ * and distributed to multiple stream threads.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+@FunctionalInterface
+public interface ProcessorSupplier<KIn, VIn, KOut, VOut> extends ConnectedStoreProvider {
+
+    /**
+     * Return a new {@link Processor} instance.
+     *
+     * @return a new {@link Processor} instance
+     */
+    Processor<KIn, VIn, KOut, VOut> get();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 1befc05..6012817 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -42,7 +42,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     private final Serde<?> valueSerde;
     private boolean initialized;
     protected ProcessorRecordContext recordContext;
-    protected ProcessorNode<?, ?> currentNode;
+    protected ProcessorNode<?, ?, ?, ?> currentNode;
     private long currentSystemTimeMs;
 
     protected ThreadCache cache;
@@ -196,12 +196,12 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
-    public void setCurrentNode(final ProcessorNode<?, ?> currentNode) {
+    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
         this.currentNode = currentNode;
     }
 
     @Override
-    public ProcessorNode<?, ?> currentNode() {
+    public ProcessorNode<?, ?, ?, ?> currentNode() {
         return currentNode;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 57989df..695eb77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -57,12 +57,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final K key, final V value) {
-        final ProcessorNode<?, ?> previousNode = currentNode();
+    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+        final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         try {
-            for (final ProcessorNode<?, ?> child : currentNode().children()) {
+            for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<K, V>) child).process(key, value);
+                ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
             }
         } finally {
             setCurrentNode(previousNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index b557330..700f135 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -68,7 +68,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
-            final SourceNode<?, ?> source = topology.source(sourceTopic);
+            final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic);
             deserializers.put(
                 sourceTopic,
                 new RecordDeserializer(
@@ -104,7 +104,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                     deserialized.headers());
             processorContext.setRecordContext(recordContext);
             processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
-            ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
@@ -131,7 +131,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     }
 
     private void initTopology() {
-        for (final ProcessorNode<?, ?> node : this.topology.processors()) {
+        for (final ProcessorNode<?, ?, ?, ?> node : this.topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
                 node.init(this.processorContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
similarity index 86%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
index 8d2b324..39c3084 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -33,9 +31,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext extends ProcessorContext {
-    BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
-    ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
+public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {
 
     @Override
     StreamsMetricsImpl metrics();
@@ -46,7 +42,7 @@ public interface InternalProcessorContext extends ProcessorContext {
     void setSystemTimeMs(long timeMs);
 
     /**
-     * @retun the current wall-clock system timestamp in milliseconds
+     * @return the current wall-clock system timestamp in milliseconds
      */
     long currentSystemTimeMs();
 
@@ -64,12 +60,12 @@ public interface InternalProcessorContext extends ProcessorContext {
     /**
      * @param currentNode the current {@link ProcessorNode}
      */
-    void setCurrentNode(ProcessorNode<?, ?> currentNode);
+    void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
 
     /**
      * Get the current {@link ProcessorNode}
      */
-    ProcessorNode<?, ?> currentNode();
+    ProcessorNode<?, ?, ?, ?> currentNode();
 
     /**
      * Get the thread-global cache
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 8d2b324..8e4ec25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -46,7 +46,7 @@ public interface InternalProcessorContext extends ProcessorContext {
     void setSystemTimeMs(long timeMs);
 
     /**
-     * @retun the current wall-clock system timestamp in milliseconds
+     * @return the current wall-clock system timestamp in milliseconds
      */
     long currentSystemTimeMs();
 
@@ -64,12 +64,12 @@ public interface InternalProcessorContext extends ProcessorContext {
     /**
      * @param currentNode the current {@link ProcessorNode}
      */
-    void setCurrentNode(ProcessorNode<?, ?> currentNode);
+    void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
 
     /**
      * Get the current {@link ProcessorNode}
      */
-    ProcessorNode<?, ?> currentNode();
+    ProcessorNode<?, ?, ?, ?> currentNode();
 
     /**
      * Get the thread-global cache
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9844341..a915b60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -61,7 +61,7 @@ public class InternalTopologyBuilder {
     private static final String[] NO_PREDECESSORS = {};
 
     // node factories in a topological order
-    private final Map<String, NodeFactory<?, ?>> nodeFactories = new LinkedHashMap<>();
+    private final Map<String, NodeFactory<?, ?, ?, ?>> nodeFactories = new LinkedHashMap<>();
 
     private final Map<String, StateStoreFactory<?>> stateFactories = new HashMap<>();
 
@@ -176,7 +176,7 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private static abstract class NodeFactory<K, V> {
+    private static abstract class NodeFactory<KIn, VIn, KOut, VOut> {
         final String name;
         final String[] predecessors;
 
@@ -186,18 +186,18 @@ public class InternalTopologyBuilder {
             this.predecessors = predecessors;
         }
 
-        public abstract ProcessorNode<K, V> build();
+        public abstract ProcessorNode<KIn, VIn, KOut, VOut> build();
 
         abstract AbstractNode describe();
     }
 
-    private static class ProcessorNodeFactory<K, V> extends NodeFactory<K, V> {
-        private final ProcessorSupplier<K, V> supplier;
+    private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
+        private final ProcessorSupplier<KIn, VIn> supplier;
         private final Set<String> stateStoreNames = new HashSet<>();
 
         ProcessorNodeFactory(final String name,
                              final String[] predecessors,
-                             final ProcessorSupplier<K, V> supplier) {
+                             final ProcessorSupplier<KIn, VIn> supplier) {
             super(name, predecessors.clone());
             this.supplier = supplier;
         }
@@ -207,7 +207,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<K, V> build() {
+        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
             return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
         }
 
@@ -221,19 +221,19 @@ public class InternalTopologyBuilder {
     // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
     private final Map<String, Pattern> topicToPatterns = new HashMap<>();
 
-    private class SourceNodeFactory<K, V> extends NodeFactory<K, V> {
+    private class SourceNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
         private final List<String> topics;
         private final Pattern pattern;
-        private final Deserializer<K> keyDeserializer;
-        private final Deserializer<V> valDeserializer;
+        private final Deserializer<KIn> keyDeserializer;
+        private final Deserializer<VIn> valDeserializer;
         private final TimestampExtractor timestampExtractor;
 
         private SourceNodeFactory(final String name,
                                   final String[] topics,
                                   final Pattern pattern,
                                   final TimestampExtractor timestampExtractor,
-                                  final Deserializer<K> keyDeserializer,
-                                  final Deserializer<V> valDeserializer) {
+                                  final Deserializer<KIn> keyDeserializer,
+                                  final Deserializer<VIn> valDeserializer) {
             super(name, NO_PREDECESSORS);
             this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
             this.pattern = pattern;
@@ -269,7 +269,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<K, V> build() {
+        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
             return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);
         }
 
@@ -283,18 +283,18 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private class SinkNodeFactory<K, V> extends NodeFactory<K, V> {
-        private final Serializer<K> keySerializer;
-        private final Serializer<V> valSerializer;
-        private final StreamPartitioner<? super K, ? super V> partitioner;
-        private final TopicNameExtractor<K, V> topicExtractor;
+    private class SinkNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
+        private final Serializer<KIn> keySerializer;
+        private final Serializer<VIn> valSerializer;
+        private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
+        private final TopicNameExtractor<KIn, VIn> topicExtractor;
 
         private SinkNodeFactory(final String name,
                                 final String[] predecessors,
-                                final TopicNameExtractor<K, V> topicExtractor,
-                                final Serializer<K> keySerializer,
-                                final Serializer<V> valSerializer,
-                                final StreamPartitioner<? super K, ? super V> partitioner) {
+                                final TopicNameExtractor<KIn, VIn> topicExtractor,
+                                final Serializer<KIn> keySerializer,
+                                final Serializer<VIn> valSerializer,
+                                final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
             super(name, predecessors.clone());
             this.topicExtractor = topicExtractor;
             this.keySerializer = keySerializer;
@@ -303,9 +303,9 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<K, V> build() {
+        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
             if (topicExtractor instanceof StaticTopicNameExtractor) {
-                final String topic = ((StaticTopicNameExtractor<K, V>) topicExtractor).topicName;
+                final String topic = ((StaticTopicNameExtractor<KIn, VIn>) topicExtractor).topicName;
                 if (internalTopicNamesWithProperties.containsKey(topic)) {
                     // prefix the internal topic name with the application id
                     return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
@@ -318,7 +318,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        Sink<K, V> describe() {
+        Sink<KIn, VIn> describe() {
             return new Sink<>(name, topicExtractor);
         }
     }
@@ -532,14 +532,14 @@ public class InternalTopologyBuilder {
         nodeGroups = null;
     }
 
-    public final <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
-                                            final String sourceName,
-                                            final TimestampExtractor timestampExtractor,
-                                            final Deserializer<K> keyDeserializer,
-                                            final Deserializer<V> valueDeserializer,
-                                            final String topic,
-                                            final String processorName,
-                                            final ProcessorSupplier<K, V> stateUpdateSupplier) {
+    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                            final String sourceName,
+                                                            final TimestampExtractor timestampExtractor,
+                                                            final Deserializer<KIn> keyDeserializer,
+                                                            final Deserializer<VIn> valueDeserializer,
+                                                            final String topic,
+                                                            final String processorName,
+                                                            final ProcessorSupplier<KIn, VIn> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "store builder must not be null");
         validateGlobalStoreArguments(sourceName,
                                      topic,
@@ -552,7 +552,7 @@ public class InternalTopologyBuilder {
         final String[] topics = {topic};
         final String[] predecessors = {sourceName};
 
-        final ProcessorNodeFactory<K, V> nodeFactory = new ProcessorNodeFactory<>(
+        final ProcessorNodeFactory<KIn, VIn, KOut, VOut> nodeFactory = new ProcessorNodeFactory<>(
             processorName,
             predecessors,
             stateUpdateSupplier
@@ -715,9 +715,9 @@ public class InternalTopologyBuilder {
         }
         stateStoreFactory.users().add(processorName);
 
-        final NodeFactory<?, ?> nodeFactory = nodeFactories.get(processorName);
+        final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(processorName);
         if (nodeFactory instanceof ProcessorNodeFactory) {
-            final ProcessorNodeFactory<?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?>) nodeFactory;
+            final ProcessorNodeFactory<?, ?, ?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory;
             processorNodeFactory.addStateStore(stateStoreName);
             connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
         } else {
@@ -725,21 +725,21 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
-        final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
+    private Set<SourceNodeFactory<?, ?, ?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
+        final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodes = new HashSet<>();
         for (final String predecessor : predecessors) {
-            final NodeFactory<?, ?> nodeFactory = nodeFactories.get(predecessor);
+            final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(predecessor);
             if (nodeFactory instanceof SourceNodeFactory) {
-                sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
+                sourceNodes.add((SourceNodeFactory<?, ?, ?, ?>) nodeFactory);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
-                sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?>) nodeFactory).predecessors));
+                sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory).predecessors));
             }
         }
         return sourceNodes;
     }
 
-    private <K, V> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
-                                                                     final ProcessorNodeFactory<K, V> processorNodeFactory) {
+    private <KIn, VIn, KOut, VOut> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+                                                                                     final ProcessorNodeFactory<KIn, VIn, KOut, VOut> processorNodeFactory) {
         // we should never update the mapping from state store names to source topics if the store name already exists
         // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
         // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
@@ -751,10 +751,10 @@ public class InternalTopologyBuilder {
 
         final Set<String> sourceTopics = new HashSet<>();
         final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
+        final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodesForPredecessor =
             findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
 
-        for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
+        for (final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
             if (sourceNodeFactory.pattern != null) {
                 sourcePatterns.add(sourceNodeFactory.pattern);
             } else {
@@ -763,13 +763,17 @@ public class InternalTopologyBuilder {
         }
 
         if (!sourceTopics.isEmpty()) {
-            stateStoreNameToSourceTopics.put(stateStoreName,
-                    Collections.unmodifiableSet(sourceTopics));
+            stateStoreNameToSourceTopics.put(
+                stateStoreName,
+                Collections.unmodifiableSet(sourceTopics)
+            );
         }
 
         if (!sourcePatterns.isEmpty()) {
-            stateStoreNameToSourceRegex.put(stateStoreName,
-                    Collections.unmodifiableSet(sourcePatterns));
+            stateStoreNameToSourceRegex.put(
+                stateStoreName,
+                Collections.unmodifiableSet(sourcePatterns)
+            );
         }
 
     }
@@ -880,40 +884,41 @@ public class InternalTopologyBuilder {
         return globalGroups;
     }
 
+    @SuppressWarnings("unchecked")
     private ProcessorTopology build(final Set<String> nodeGroup) {
         Objects.requireNonNull(applicationId, "topology has not completed optimization");
 
-        final Map<String, ProcessorNode<?, ?>> processorMap = new LinkedHashMap<>();
-        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
+        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>();
+        final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>();
+        final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
         final Set<String> repartitionTopics = new HashSet<>();
 
         // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
         // also make sure the state store map values following the insertion ordering
-        for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
+        for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                final ProcessorNode<?, ?> node = factory.build();
+                final ProcessorNode<?, ?, ?, ?> node = factory.build();
                 processorMap.put(node.name(), node);
 
                 if (factory instanceof ProcessorNodeFactory) {
                     buildProcessorNode(processorMap,
                                        stateStoreMap,
-                                       (ProcessorNodeFactory<?, ?>) factory,
-                                       node);
+                                       (ProcessorNodeFactory<?, ?, ?, ?>) factory,
+                                       (ProcessorNode<Object, Object, Object, Object>) node);
 
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory<?, ?>) factory,
-                                    (SourceNode<?, ?>) node);
+                                    (SourceNodeFactory<?, ?, ?, ?>) factory,
+                                    (SourceNode<?, ?, ?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
                                   repartitionTopics,
-                                  (SinkNodeFactory<?, ?>) factory,
-                                  (SinkNode<?, ?>) node);
+                                  (SinkNodeFactory<?, ?, ?, ?>) factory,
+                                  (SinkNode<Object, Object, ?, ?>) node);
                 } else {
                     throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
@@ -929,14 +934,14 @@ public class InternalTopologyBuilder {
                                      repartitionTopics);
     }
 
-    private void buildSinkNode(final Map<String, ProcessorNode<?, ?>> processorMap,
-                               final Map<String, SinkNode<?, ?>> topicSinkMap,
+    private void buildSinkNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
+                               final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap,
                                final Set<String> repartitionTopics,
-                               final SinkNodeFactory<?, ?> sinkNodeFactory,
-                               final SinkNode<?, ?> node) {
+                               final SinkNodeFactory<?, ?, ?, ?> sinkNodeFactory,
+                               final SinkNode<Object, Object, ?, ?> node) {
 
-        for (final String predecessor : sinkNodeFactory.predecessors) {
-            processorMap.get(predecessor).addChild(node);
+        for (final String predecessorName : sinkNodeFactory.predecessors) {
+            getProcessor(processorMap, predecessorName).addChild(node);
             if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) {
                 final String topic = ((StaticTopicNameExtractor<?, ?>) sinkNodeFactory.topicExtractor).topicName;
 
@@ -953,14 +958,22 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap,
+    @SuppressWarnings("unchecked")
+    private static <KIn, VIn, KOut, VOut> ProcessorNode<KIn, VIn, KOut, VOut> getProcessor(
+        final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
+        final String predecessor) {
+
+        return (ProcessorNode<KIn, VIn, KOut, VOut>) processorMap.get(predecessor);
+    }
+
+    private void buildSourceNode(final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap,
                                  final Set<String> repartitionTopics,
-                                 final SourceNodeFactory<?, ?> sourceNodeFactory,
-                                 final SourceNode<?, ?> node) {
+                                 final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory,
+                                 final SourceNode<?, ?, ?, ?> node) {
 
         final List<String> topics = (sourceNodeFactory.pattern != null) ?
-                                    sourceNodeFactory.getTopics(subscriptionUpdates()) :
-                                    sourceNodeFactory.topics;
+            sourceNodeFactory.getTopics(subscriptionUpdates()) :
+            sourceNodeFactory.topics;
 
         for (final String topic : topics) {
             if (internalTopicNamesWithProperties.containsKey(topic)) {
@@ -974,13 +987,13 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private void buildProcessorNode(final Map<String, ProcessorNode<?, ?>> processorMap,
+    private void buildProcessorNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
                                     final Map<String, StateStore> stateStoreMap,
-                                    final ProcessorNodeFactory<?, ?> factory,
-                                    final ProcessorNode<?, ?> node) {
+                                    final ProcessorNodeFactory<?, ?, ?, ?> factory,
+                                    final ProcessorNode<Object, Object, Object, Object> node) {
 
         for (final String predecessor : factory.predecessors) {
-            final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
+            final ProcessorNode<Object, Object, Object, Object> predecessorNode = getProcessor(processorMap, predecessor);
             predecessorNode.addChild(node);
         }
         for (final String stateStoreName : factory.stateStoreNames) {
@@ -1119,7 +1132,7 @@ public class InternalTopologyBuilder {
     private void setRegexMatchedTopicsToSourceNodes() {
         if (hasSubscriptionUpdates()) {
             for (final String nodeName : nodeToSourcePatterns.keySet()) {
-                final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
+                final SourceNodeFactory<?, ?, ?, ?> sourceNode = (SourceNodeFactory<?, ?, ?, ?>) nodeFactories.get(nodeName);
                 final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
                 //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
                 nodeToSourceTopics.put(nodeName, sourceTopics);
@@ -1306,10 +1319,10 @@ public class InternalTopologyBuilder {
     }
 
     private boolean isGlobalSource(final String nodeName) {
-        final NodeFactory<?, ?> nodeFactory = nodeFactories.get(nodeName);
+        final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(nodeName);
 
         if (nodeFactory instanceof SourceNodeFactory) {
-            final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics;
+            final List<String> topics = ((SourceNodeFactory<?, ?, ?, ?>) nodeFactory).topics;
             return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0));
         }
         return false;
@@ -1348,7 +1361,7 @@ public class InternalTopologyBuilder {
                 description.addGlobalStore(new GlobalStore(
                     node,
                     processorNode,
-                    ((ProcessorNodeFactory<?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+                    ((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
                     nodeToSourceTopics.get(node).get(0),
                     id
                 ));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 2bc5891..559e3b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -65,7 +65,7 @@ public class PartitionGroup {
     static class RecordInfo {
         RecordQueue queue;
 
-        ProcessorNode<?, ?> node() {
+        ProcessorNode<?, ?, ?, ?> node() {
             return queue.source();
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
new file mode 100644
index 0000000..c68829d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
+    private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
+
+    static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
+        if (delegate == null) {
+            return null;
+        } else {
+            return new ProcessorAdapter<>(delegate);
+        }
+    }
+
+    private ProcessorAdapter(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
+        this.delegate = delegate;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init(final ProcessorContext<KOut, VOut> context) {
+        delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+    }
+
+    @Override
+    public void process(final KIn key, final VIn value) {
+        delegate.process(key, value);
+    }
+
+    @Override
+    public void close() {
+        delegate.close();
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
new file mode 100644
index 0000000..d11aaf3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class ProcessorContextAdapter<KForward, VForward>
+    implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {
+
+    private final InternalProcessorContext delegate;
+
+    @SuppressWarnings("unchecked")
+    static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> shim(final InternalProcessorContext delegate) {
+        if (delegate instanceof ProcessorContextReverseAdapter) {
+            return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
+        } else {
+            return new ProcessorContextAdapter<>(delegate);
+        }
+    }
+
+    private ProcessorContextAdapter(final InternalProcessorContext delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public String applicationId() {
+        return delegate.applicationId();
+    }
+
+    @Override
+    public TaskId taskId() {
+        return delegate.taskId();
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return delegate.keySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return delegate.valueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return delegate.stateDir();
+    }
+
+    @Override
+    public StreamsMetricsImpl metrics() {
+        return delegate.metrics();
+    }
+
+    @Override
+    public void setSystemTimeMs(final long timeMs) {
+        delegate.setSystemTimeMs(timeMs);
+    }
+
+    @Override
+    public long currentSystemTimeMs() {
+        return delegate.currentSystemTimeMs();
+    }
+
+    @Override
+    public ProcessorRecordContext recordContext() {
+        return delegate.recordContext();
+    }
+
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        delegate.setRecordContext(recordContext);
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
+        delegate.setCurrentNode(currentNode);
+    }
+
+    @Override
+    public ProcessorNode<?, ?, ?, ?> currentNode() {
+        return delegate.currentNode();
+    }
+
+    @Override
+    public ThreadCache cache() {
+        return delegate.cache();
+    }
+
+    @Override
+    public void initialize() {
+        delegate.initialize();
+    }
+
+    @Override
+    public void uninitialize() {
+        delegate.uninitialize();
+    }
+
+    @Override
+    public Task.TaskType taskType() {
+        return delegate.taskType();
+    }
+
+    @Override
+    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
+        delegate.transitionToActive(streamTask, recordCollector, newCache);
+    }
+
+    @Override
+    public void transitionToStandby(final ThreadCache newCache) {
+        delegate.transitionToStandby(newCache);
+    }
+
+    @Override
+    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
+        delegate.registerCacheFlushListener(namespace, listener);
+    }
+
+    @Override
+    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
+        return delegate.getStateStore(builder);
+    }
+
+    @Override
+    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
+        delegate.logChange(storeName, key, value, timestamp);
+    }
+
+    @Override
+    public String changelogFor(final String storeName) {
+        return delegate.changelogFor(storeName);
+    }
+
+    @Override
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+        delegate.register(store, stateRestoreCallback);
+    }
+
+    @Override
+    public StateStore getStateStore(final String name) {
+        return delegate.getStateStore(name);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+        return delegate.schedule(interval, type, callback);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
+        delegate.forward(key, value);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
+        delegate.forward(key, value, to);
+    }
+
+    @Override
+    public void commit() {
+        delegate.commit();
+    }
+
+    @Override
+    public String topic() {
+        return delegate.topic();
+    }
+
+    @Override
+    public int partition() {
+        return delegate.partition();
+    }
+
+    @Override
+    public long offset() {
+        return delegate.offset();
+    }
+
+    @Override
+    public Headers headers() {
+        return delegate.headers();
+    }
+
+    @Override
+    public long timestamp() {
+        return delegate.timestamp();
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        return delegate.appConfigs();
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return delegate.appConfigsWithPrefix(prefix);
+    }
+
+    InternalProcessorContext delegate() {
+        return delegate;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 6baef83..d743d72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -196,7 +196,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                                final V value,
                                final To to) {
         throwUnsupportedOperationExceptionIfStandby("forward");
-        final ProcessorNode<?, ?> previousNode = currentNode();
+        final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         final ProcessorRecordContext previousContext = recordContext;
 
         try {
@@ -212,12 +212,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
             final String sendTo = toInternal.child();
             if (sendTo == null) {
-                final List<ProcessorNode<?, ?>> children = currentNode().children();
-                for (final ProcessorNode<?, ?> child : children) {
-                    forward((ProcessorNode<K, V>) child, key, value);
+                final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
+                for (final ProcessorNode<?, ?, ?, ?> child : children) {
+                    forward((ProcessorNode<K, V, ?, ?>) child, key, value);
                 }
             } else {
-                final ProcessorNode<K, V> child = currentNode().getChild(sendTo);
+                final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);
                 if (child == null) {
                     throw new StreamsException("Unknown downstream node: " + sendTo
                         + " either does not exist or is not connected to this processor.");
@@ -230,7 +230,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private <K, V> void forward(final ProcessorNode<K, V> child,
+    private <K, V> void forward(final ProcessorNode<K, V, ?, ?> child,
                                 final K key,
                                 final V value) {
         setCurrentNode(child);
@@ -293,7 +293,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     }
 
     @Override
-    public ProcessorNode<?, ?> currentNode() {
+    public ProcessorNode<?, ?, ?, ?> currentNode() {
         throwUnsupportedOperationExceptionIfStandby("currentNode");
         return super.currentNode();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
new file mode 100644
index 0000000..6e82a5e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
+    private final InternalApiProcessorContext<Object, Object> delegate;
+
+    static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+        if (delegate instanceof ProcessorContextAdapter) {
+            return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+        } else {
+            return new ProcessorContextReverseAdapter(delegate);
+        }
+    }
+
+    private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public String applicationId() {
+        return delegate.applicationId();
+    }
+
+    @Override
+    public TaskId taskId() {
+        return delegate.taskId();
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return delegate.keySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return delegate.valueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return delegate.stateDir();
+    }
+
+    @Override
+    public StreamsMetricsImpl metrics() {
+        return delegate.metrics();
+    }
+
+    @Override
+    public void setSystemTimeMs(final long timeMs) {
+        delegate.setSystemTimeMs(timeMs);
+    }
+
+    @Override
+    public long currentSystemTimeMs() {
+        return delegate.currentSystemTimeMs();
+    }
+
+    @Override
+    public ProcessorRecordContext recordContext() {
+        return delegate.recordContext();
+    }
+
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        delegate.setRecordContext(recordContext);
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
+        delegate.setCurrentNode(currentNode);
+    }
+
+    @Override
+    public ProcessorNode<?, ?, ?, ?> currentNode() {
+        return delegate.currentNode();
+    }
+
+    @Override
+    public ThreadCache cache() {
+        return delegate.cache();
+    }
+
+    @Override
+    public void initialize() {
+        delegate.initialize();
+    }
+
+    @Override
+    public void uninitialize() {
+        delegate.uninitialize();
+    }
+
+    @Override
+    public Task.TaskType taskType() {
+        return delegate.taskType();
+    }
+
+    @Override
+    public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
+        delegate.transitionToActive(streamTask, recordCollector, newCache);
+    }
+
+    @Override
+    public void transitionToStandby(final ThreadCache newCache) {
+        delegate.transitionToStandby(newCache);
+    }
+
+    @Override
+    public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
+        delegate.registerCacheFlushListener(namespace, listener);
+    }
+
+    @Override
+    public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
+        return delegate.getStateStore(builder);
+    }
+
+    @Override
+    public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
+        delegate.logChange(storeName, key, value, timestamp);
+    }
+
+    @Override
+    public String changelogFor(final String storeName) {
+        return delegate.changelogFor(storeName);
+    }
+
+    @Override
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+        delegate.register(store, stateRestoreCallback);
+    }
+
+    @Override
+    public StateStore getStateStore(final String name) {
+        return delegate.getStateStore(name);
+    }
+
+    @Deprecated
+    @Override
+    public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+        return delegate.schedule(Duration.ofMillis(intervalMs), type, callback);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
+        return delegate.schedule(interval, type, callback);
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        delegate.forward(key, value);
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        delegate.forward(key, value, to);
+    }
+
+    @Deprecated
+    @Override
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
+    }
+
+    @Deprecated
+    @Override
+    public <K, V> void forward(final K key, final V value, final String childName) {
+        delegate.forward(key, value, To.child(childName));
+    }
+
+    @Override
+    public void commit() {
+        delegate.commit();
+    }
+
+    @Override
+    public String topic() {
+        return delegate.topic();
+    }
+
+    @Override
+    public int partition() {
+        return delegate.partition();
+    }
+
+    @Override
+    public long offset() {
+        return delegate.offset();
+    }
+
+    @Override
+    public Headers headers() {
+        return delegate.headers();
+    }
+
+    @Override
+    public long timestamp() {
+        return delegate.timestamp();
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        return delegate.appConfigs();
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return delegate.appConfigsWithPrefix(prefix);
+    }
+
+    InternalApiProcessorContext<Object, Object> delegate() {
+        return delegate;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index a91eb6f..b489e62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -33,13 +33,13 @@ import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
-public class ProcessorNode<K, V> {
+public class ProcessorNode<KIn, VIn, KOut, VOut> {
 
     // TODO: 'children' can be removed when #forward() via index is removed
-    private final List<ProcessorNode<?, ?>> children;
-    private final Map<String, ProcessorNode<?, ?>> childByName;
+    private final List<ProcessorNode<KOut, VOut, ?, ?>> children;
+    private final Map<String, ProcessorNode<KOut, VOut, ?, ?>> childByName;
 
-    private final Processor<K, V> processor;
+    private final Processor<KIn, VIn, KOut, VOut> processor;
     private final String name;
     private final Time time;
 
@@ -57,9 +57,12 @@ public class ProcessorNode<K, V> {
         this(name, null, null);
     }
 
-    public ProcessorNode(final String name, final Processor<K, V> processor, final Set<String> stateStores) {
+    public ProcessorNode(final String name,
+                         final org.apache.kafka.streams.processor.Processor<KIn, VIn> processor,
+                         final Set<String> stateStores) {
+
         this.name = name;
-        this.processor = processor;
+        this.processor = ProcessorAdapter.adapt(processor);
         this.children = new ArrayList<>();
         this.childByName = new HashMap<>();
         this.stateStores = stateStores;
@@ -70,11 +73,11 @@ public class ProcessorNode<K, V> {
         return name;
     }
 
-    public final Processor<K, V> processor() {
+    public final Processor<KIn, VIn, KOut, VOut> processor() {
         return processor;
     }
 
-    public List<ProcessorNode<?, ?>> children() {
+    public List<ProcessorNode<KOut, VOut, ?, ?>> children() {
         return children;
     }
 
@@ -82,7 +85,7 @@ public class ProcessorNode<K, V> {
         return childByName.get(childName);
     }
 
-    public void addChild(final ProcessorNode<?, ?> child) {
+    public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) {
         children.add(child);
         childByName.put(child.name, child);
     }
@@ -94,7 +97,7 @@ public class ProcessorNode<K, V> {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init(context);
+                        processor.init(ProcessorContextAdapter.shim(context));
                     }
                 },
                 time,
@@ -137,7 +140,7 @@ public class ProcessorNode<K, V> {
     }
 
 
-    public void process(final K key, final V value) {
+    public void process(final KIn key, final VIn value) {
         try {
             maybeMeasureLatency(() -> processor.process(key, value), time, processSensor);
         } catch (final ClassCastException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
index 405b1b6..5544adc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
@@ -21,6 +21,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 
 public interface ProcessorNodePunctuator {
 
-    void punctuate(ProcessorNode<?, ?> node, long timestamp, PunctuationType type, Punctuator punctuator);
+    void punctuate(ProcessorNode<?, ?, ?, ?> node, long timestamp, PunctuationType type, Punctuator punctuator);
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 64d2adc..75f3688 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -31,10 +31,10 @@ import org.slf4j.LoggerFactory;
 public class ProcessorTopology {
     private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class);
 
-    private final List<ProcessorNode<?, ?>> processorNodes;
-    private final Map<String, SourceNode<?, ?>> sourceNodesByName;
-    private final Map<String, SourceNode<?, ?>> sourceNodesByTopic;
-    private final Map<String, SinkNode<?, ?>> sinksByTopic;
+    private final List<ProcessorNode<?, ?, ?, ?>> processorNodes;
+    private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByName;
+    private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic;
+    private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic;
     private final Set<String> terminalNodes;
     private final List<StateStore> stateStores;
     private final Set<String> repartitionTopics;
@@ -43,9 +43,9 @@ public class ProcessorTopology {
     private final List<StateStore> globalStateStores;
     private final Map<String, String> storeToChangelogTopic;
 
-    public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes,
-                             final Map<String, SourceNode<?, ?>> sourceNodesByTopic,
-                             final Map<String, SinkNode<?, ?>> sinksByTopic,
+    public ProcessorTopology(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+                             final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic,
+                             final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic,
                              final List<StateStore> stateStores,
                              final List<StateStore> globalStateStores,
                              final Map<String, String> storeToChangelogTopic,
@@ -59,14 +59,14 @@ public class ProcessorTopology {
         this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
 
         this.terminalNodes = new HashSet<>();
-        for (final ProcessorNode<?, ?> node : processorNodes) {
+        for (final ProcessorNode<?, ?, ?, ?> node : processorNodes) {
             if (node.isTerminalNode()) {
                 terminalNodes.add(node.name());
             }
         }
 
         this.sourceNodesByName = new HashMap<>();
-        for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) {
+        for (final SourceNode<?, ?, ?, ?> source : sourceNodesByTopic.values()) {
             sourceNodesByName.put(source.name(), source);
         }
     }
@@ -75,11 +75,11 @@ public class ProcessorTopology {
         return sourceNodesByTopic.keySet();
     }
 
-    public SourceNode<?, ?> source(final String topic) {
+    public SourceNode<?, ?, ?, ?> source(final String topic) {
         return sourceNodesByTopic.get(topic);
     }
 
-    public Set<SourceNode<?, ?>> sources() {
+    public Set<SourceNode<?, ?, ?, ?>> sources() {
         return new HashSet<>(sourceNodesByTopic.values());
     }
 
@@ -87,7 +87,7 @@ public class ProcessorTopology {
         return sinksByTopic.keySet();
     }
 
-    public SinkNode<?, ?> sink(final String topic) {
+    public SinkNode<?, ?, ?, ?> sink(final String topic) {
         return sinksByTopic.get(topic);
     }
 
@@ -95,7 +95,7 @@ public class ProcessorTopology {
         return terminalNodes;
     }
 
-    public List<ProcessorNode<?, ?>> processors() {
+    public List<ProcessorNode<?, ?, ?, ?>> processors() {
         return processorNodes;
     }
 
@@ -163,13 +163,13 @@ public class ProcessorTopology {
         }
     }
 
-    private String childrenToString(final String indent, final List<ProcessorNode<?, ?>> children) {
+    private String childrenToString(final String indent, final List<? extends ProcessorNode<?, ?, ?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";
         }
 
         final StringBuilder sb = new StringBuilder(indent + "\tchildren:\t[");
-        for (final ProcessorNode<?, ?> child : children) {
+        for (final ProcessorNode<?, ?, ?, ?> child : children) {
             sb.append(child.name());
             sb.append(", ");
         }
@@ -177,7 +177,7 @@ public class ProcessorTopology {
         sb.append("]\n");
 
         // recursively print children
-        for (final ProcessorNode<?, ?> child : children) {
+        for (final ProcessorNode<?, ?, ?, ?> child : children) {
             sb.append(child.toString(indent)).append(childrenToString(indent, child.children()));
         }
         return sb.toString();
@@ -199,10 +199,10 @@ public class ProcessorTopology {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
-        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
-        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) {
+        final Map<SourceNode<?, ?, ?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) {
             final String topic = sourceNodeEntry.getKey();
-            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getValue();
             sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
             sourceToTopics.get(source).add(topic);
         }
@@ -210,8 +210,8 @@ public class ProcessorTopology {
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
-            final SourceNode<?, ?> source = sourceNodeEntry.getKey();
+        for (final Map.Entry<SourceNode<?, ?, ?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
+            final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getKey();
             final List<String> topics = sourceNodeEntry.getValue();
             sb.append(source.toString(indent + "\t"))
                 .append(topicsToString(indent + "\t", topics))
@@ -234,7 +234,7 @@ public class ProcessorTopology {
 
     // for testing only
     public Set<String> processorConnectedStateStores(final String processorName) {
-        for (final ProcessorNode<?, ?> node : processorNodes) {
+        for (final ProcessorNode<?, ?, ?, ?> node : processorNodes) {
             if (node.name().equals(processorName)) {
                 return node.stateStores;
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 86b1b44..269dff2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -29,11 +29,11 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXC
 
 class RecordDeserializer {
     private final Logger log;
-    private final SourceNode<?, ?> sourceNode;
+    private final SourceNode<?, ?, ?, ?> sourceNode;
     private final Sensor droppedRecordsSensor;
     private final DeserializationExceptionHandler deserializationExceptionHandler;
 
-    RecordDeserializer(final SourceNode<?, ?> sourceNode,
+    RecordDeserializer(final SourceNode<?, ?, ?, ?> sourceNode,
                        final DeserializationExceptionHandler deserializationExceptionHandler,
                        final LogContext logContext,
                        final Sensor droppedRecordsSensor) {
@@ -98,7 +98,7 @@ class RecordDeserializer {
         }
     }
 
-    SourceNode<?, ?> sourceNode() {
+    SourceNode<?, ?, ?, ?> sourceNode() {
         return sourceNode;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 6f0db8a..df7e834 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -39,7 +39,7 @@ public class RecordQueue {
     public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
 
     private final Logger log;
-    private final SourceNode<?, ?> source;
+    private final SourceNode<?, ?, ?, ?> source;
     private final TopicPartition partition;
     private final ProcessorContext processorContext;
     private final TimestampExtractor timestampExtractor;
@@ -52,7 +52,7 @@ public class RecordQueue {
     private final Sensor droppedRecordsSensor;
 
     RecordQueue(final TopicPartition partition,
-                final SourceNode<?, ?> source,
+                final SourceNode<?, ?, ?, ?> source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler deserializationExceptionHandler,
                 final InternalProcessorContext processorContext,
@@ -85,7 +85,7 @@ public class RecordQueue {
      *
      * @return SourceNode
      */
-    public SourceNode<?, ?> source() {
+    public SourceNode<?, ?, ?, ?> source() {
         return source;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 9b0a254..a77deb8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -22,20 +22,20 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 
-public class SinkNode<K, V> extends ProcessorNode<K, V> {
+public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
-    private Serializer<K> keySerializer;
-    private Serializer<V> valSerializer;
-    private final TopicNameExtractor<K, V> topicExtractor;
-    private final StreamPartitioner<? super K, ? super V> partitioner;
+    private Serializer<KIn> keySerializer;
+    private Serializer<VIn> valSerializer;
+    private final TopicNameExtractor<KIn, VIn> topicExtractor;
+    private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
 
     private InternalProcessorContext context;
 
     SinkNode(final String name,
-             final TopicNameExtractor<K, V> topicExtractor,
-             final Serializer<K> keySerializer,
-             final Serializer<V> valSerializer,
-             final StreamPartitioner<? super K, ? super V> partitioner) {
+             final TopicNameExtractor<KIn, VIn> topicExtractor,
+             final Serializer<KIn> keySerializer,
+             final Serializer<VIn> valSerializer,
+             final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
         super(name);
 
         this.topicExtractor = topicExtractor;
@@ -48,7 +48,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
      * @throws UnsupportedOperationException if this method adds a child to a sink node
      */
     @Override
-    public void addChild(final ProcessorNode<?, ?> child) {
+    public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) {
         throw new UnsupportedOperationException("sink node does not allow addChild");
     }
 
@@ -60,10 +60,10 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
         // if serializers are null, get the default ones from the context
         if (keySerializer == null) {
-            keySerializer = (Serializer<K>) context.keySerde().serializer();
+            keySerializer = (Serializer<KIn>) context.keySerde().serializer();
         }
         if (valSerializer == null) {
-            valSerializer = (Serializer<V>) context.valueSerde().serializer();
+            valSerializer = (Serializer<VIn>) context.valueSerde().serializer();
         }
 
         // if serializers are internal wrapping serializers that may need to be given the default serializer
@@ -78,7 +78,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
 
     @Override
-    public void process(final K key, final V value) {
+    public void process(final KIn key, final VIn value) {
         final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
         final long timestamp = context.timestamp();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 8508a7d..6dbdfee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -23,18 +23,18 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
-public class SourceNode<K, V> extends ProcessorNode<K, V> {
+public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private InternalProcessorContext context;
-    private Deserializer<K> keyDeserializer;
-    private Deserializer<V> valDeserializer;
+    private Deserializer<KIn> keyDeserializer;
+    private Deserializer<VIn> valDeserializer;
     private final TimestampExtractor timestampExtractor;
     private Sensor processAtSourceSensor;
 
     public SourceNode(final String name,
                       final TimestampExtractor timestampExtractor,
-                      final Deserializer<K> keyDeserializer,
-                      final Deserializer<V> valDeserializer) {
+                      final Deserializer<KIn> keyDeserializer,
+                      final Deserializer<VIn> valDeserializer) {
         super(name);
         this.timestampExtractor = timestampExtractor;
         this.keyDeserializer = keyDeserializer;
@@ -42,16 +42,16 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     }
 
     public SourceNode(final String name,
-                      final Deserializer<K> keyDeserializer,
-                      final Deserializer<V> valDeserializer) {
+                      final Deserializer<KIn> keyDeserializer,
+                      final Deserializer<VIn> valDeserializer) {
         this(name, null, keyDeserializer, valDeserializer);
     }
 
-    K deserializeKey(final String topic, final Headers headers, final byte[] data) {
+    KIn deserializeKey(final String topic, final Headers headers, final byte[] data) {
         return keyDeserializer.deserialize(topic, headers, data);
     }
 
-    V deserializeValue(final String topic, final Headers headers, final byte[] data) {
+    VIn deserializeValue(final String topic, final Headers headers, final byte[] data) {
         return valDeserializer.deserialize(topic, headers, data);
     }
 
@@ -74,10 +74,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
         // if deserializers are null, get the default ones from the context
         if (this.keyDeserializer == null) {
-            this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
+            this.keyDeserializer = (Deserializer<KIn>) context.keySerde().deserializer();
         }
         if (this.valDeserializer == null) {
-            this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+            this.valDeserializer = (Deserializer<VIn>) context.valueSerde().deserializer();
         }
 
         // if deserializers are internal wrapping deserializers that may need to be given the default
@@ -92,7 +92,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
 
     @Override
-    public void process(final K key, final V value) {
+    public void process(final KIn key, final VIn value) {
         context.forward(key, value);
         processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index abdd567..a046c0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -158,7 +158,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
             );
         }
 
-        for (final ProcessorNode<?, ?> sourceNode : topology.sources()) {
+        for (final ProcessorNode<?, ?, ?, ?> sourceNode : topology.sources()) {
             final String sourceNodeName = sourceNode.name();
             e2eLatencySensors.put(
                 sourceNodeName,
@@ -296,7 +296,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         // close the processors
         // make sure close() is called for each node even when there is a RuntimeException
         RuntimeException exception = null;
-        for (final ProcessorNode<?, ?> node : topology.processors()) {
+        for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
                 node.close();
@@ -669,7 +669,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
         try {
             // process the record by passing to the source node of the topology
-            final ProcessorNode<Object, Object> currNode = (ProcessorNode<Object, Object>) recordInfo.node();
+            final ProcessorNode<Object, Object, Object, Object> currNode = (ProcessorNode<Object, Object, Object, Object>) recordInfo.node();
             final TopicPartition partition = recordInfo.partition();
 
             log.trace("Start processing one record [{}]", record);
@@ -738,7 +738,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
-    public void punctuate(final ProcessorNode<?, ?> node,
+    public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
                           final long timestamp,
                           final PunctuationType type,
                           final Punctuator punctuator) {
@@ -763,7 +763,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
-    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
+    private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime) {
         processorContext.setRecordContext(
             new ProcessorRecordContext(
                 record.timestamp,
@@ -843,7 +843,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the topology
         log.trace("Initializing processor nodes of the topology");
-        for (final ProcessorNode<?, ?> node : topology.processors()) {
+        for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
                 node.init(processorContext);
@@ -1108,7 +1108,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
 
         public RecordQueue createQueue(final TopicPartition partition) {
-            final SourceNode<?, ?> source = topology.source(partition.topic());
+            final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic());
             final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
             final TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : defaultTimestampExtractor;
             return new RecordQueue(
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index bdc60a7..ecf23ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -851,7 +851,7 @@ public class StreamsBuilderTest {
     }
 
     private static void assertNamesForOperation(final ProcessorTopology topology, final String... expected) {
-        final List<ProcessorNode<?, ?>> processors = topology.processors();
+        final List<ProcessorNode<?, ?, ?, ?>> processors = topology.processors();
         assertEquals("Invalid number of expected processors", expected.length, processors.size());
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processors.get(i).name());
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
index 69c3f38..e91b007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -54,13 +54,13 @@ public class TopologyTestDriverWrapper extends TopologyTestDriver {
      * @param name the name to search for
      * @return the processor matching the search name
      */
-    public ProcessorNode<?, ?> getProcessor(final String name) {
-        for (final ProcessorNode<?, ?> node : processorTopology.processors()) {
+    public ProcessorNode<?, ?, ?, ?> getProcessor(final String name) {
+        for (final ProcessorNode<?, ?, ?, ?> node : processorTopology.processors()) {
             if (node.name().equals(name)) {
                 return node;
             }
         }
-        for (final ProcessorNode<?, ?> node : globalTopology.processors()) {
+        for (final ProcessorNode<?, ?, ?, ?> node : globalTopology.processors()) {
             if (node.name().equals(name)) {
                 return node;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 17047a1..4b3df44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -1580,9 +1580,9 @@ public class KStreamImplTest {
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
 
-        final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
+        final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode<?, ?> sourceNode : topology.sources()) {
+        for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertNull(sourceNode.getTimestampExtractor());
             } else {
@@ -1609,9 +1609,9 @@ public class KStreamImplTest {
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
 
-        final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
+        final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode<?, ?> sourceNode : topology.sources()) {
+        for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertNull(sourceNode.getTimestampExtractor());
             } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 7119d5b..ad8cd0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -30,9 +30,6 @@ import org.hamcrest.core.IsInstanceOf;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collections;
-
-import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
@@ -54,7 +51,7 @@ public class GlobalProcessorContextImplTest {
 
     private GlobalProcessorContextImpl globalContext;
 
-    private ProcessorNode<?, ?> child;
+    private ProcessorNode<Object, Object, Object, Object> child;
     private ProcessorRecordContext recordContext;
 
     @Before
@@ -82,20 +79,12 @@ public class GlobalProcessorContextImplTest {
             null,
             null);
 
-        final ProcessorNode<?, ?> processorNode = mock(ProcessorNode.class);
-        globalContext.setCurrentNode(processorNode);
+        final ProcessorNode<Object, Object, Object, Object> processorNode = new ProcessorNode<>("testNode");
 
         child = mock(ProcessorNode.class);
+        processorNode.addChild(child);
 
-        expect(processorNode.children())
-            .andReturn(Collections.singletonList(child))
-            .anyTimes();
-        expect(processorNode.getChild(CHILD_PROCESSOR))
-            .andReturn(child);
-        expect(processorNode.getChild(anyString()))
-            .andReturn(null);
-        replay(processorNode);
-
+        globalContext.setCurrentNode(processorNode);
         recordContext = mock(ProcessorRecordContext.class);
         globalContext.setRecordContext(recordContext);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 42d3fc9..6ce2bae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -58,14 +58,14 @@ public class GlobalStateTaskTest {
     private final String topic2 = "t2";
     private final TopicPartition t1 = new TopicPartition(topic1, 1);
     private final TopicPartition t2 = new TopicPartition(topic2, 1);
-    private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(
+    private final MockSourceNode<String, String, ?, ?> sourceOne = new MockSourceNode<>(
         new StringDeserializer(),
         new StringDeserializer());
-    private final MockSourceNode<Integer, Integer>  sourceTwo = new MockSourceNode<>(
+    private final MockSourceNode<Integer, Integer, ?, ?>  sourceTwo = new MockSourceNode<>(
         new IntegerDeserializer(),
         new IntegerDeserializer());
-    private final MockProcessorNode<?, ?> processorOne = new MockProcessorNode<>();
-    private final MockProcessorNode<?, ?> processorTwo = new MockProcessorNode<>();
+    private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>();
+    private final MockProcessorNode<?, ?, ?, ?> processorTwo = new MockProcessorNode<>();
 
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
     private File testDirectory = TestUtils.tempDirectory("global-store");
@@ -78,7 +78,7 @@ public class GlobalStateTaskTest {
     @Before
     public void before() {
         final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
-        final Map<String, SourceNode<?, ?>> sourceByTopics = new HashMap<>();
+        final Map<String, SourceNode<?, ?, ?, ?>> sourceByTopics = new HashMap<>();
         sourceByTopics.put(topic1, sourceOne);
         sourceByTopics.put(topic2, sourceTwo);
         final Map<String, String> storeToTopic = new HashMap<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9bcd47e..5e3c2dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -733,9 +733,9 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(null);
     }
 
-    private Set<String> nodeNames(final Collection<ProcessorNode<?, ?>> nodes) {
+    private Set<String> nodeNames(final Collection<ProcessorNode<?, ?, ?, ?>> nodes) {
         final Set<String> nodeNames = new HashSet<>();
-        for (final ProcessorNode<?, ?> node : nodes) {
+        for (final ProcessorNode<?, ?, ?, ?> node : nodes) {
             nodeNames.add(node.name());
         }
         return nodeNames;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index d3b05bc..32e498c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -150,7 +150,7 @@ public class ProcessorContextImplTest {
         ((InternalProcessorContext) context).transitionToActive(task, null, null);
         EasyMock.expect(task.recordCollector()).andStubReturn(recordCollector);
 
-        context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
+        context.setCurrentNode(new ProcessorNode<String, Long, Object, Object>("fake", null,
             new HashSet<>(asList(
                 "LocalKeyValueStore",
                 "LocalTimestampedKeyValueStore",
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 91fa83e..08125e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -109,7 +109,7 @@ public class ProcessorNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
         final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+        final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
         node.init(context);
 
         final String threadId = Thread.currentThread().getName();
@@ -198,7 +198,7 @@ public class ProcessorNodeTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST);
         final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object> node = new ProcessorNode<Object, Object>("name", new ClassCastProcessor(), Collections.<String>emptySet());
+        final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
         final StreamsException se = assertThrows(StreamsException.class, () -> node.process("aKey", "aValue"));
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
index 25ea3ef..b4cef8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
@@ -26,8 +26,8 @@ public final class ProcessorTopologyFactories {
     private ProcessorTopologyFactories() {}
 
 
-    public static ProcessorTopology with(final List<ProcessorNode<?, ?>> processorNodes,
-                                         final Map<String, SourceNode<?, ?>> sourcesByTopic,
+    public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+                                         final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
                                          final List<StateStore> stateStoresByName,
                                          final Map<String, String> storeToChangelogTopic) {
         return new ProcessorTopology(processorNodes,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 964f352..55043e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertEquals;
 
 public class PunctuationQueueTest {
 
-    private final MockProcessorNode<String, String> node = new MockProcessorNode<>();
+    private final MockProcessorNode<String, String, ?, ?> node = new MockProcessorNode<>();
     private final PunctuationQueue queue = new PunctuationQueue();
     private final Punctuator punctuator = new Punctuator() {
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 7299067..4ae9d32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -68,7 +68,7 @@ public class RecordDeserializerTest {
         assertEquals(rawRecord.headers(), record.headers());
     }
 
-    static class TheSourceNode extends SourceNode<Object, Object> {
+    static class TheSourceNode extends SourceNode<Object, Object, Object, Object> {
         private final boolean keyThrowsException;
         private final boolean valueThrowsException;
         private final Object key;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 6929bb8e..895e045 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -64,7 +64,7 @@ public class RecordQueueTest {
         StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
         new MockRecordCollector()
     );
-    private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
+    private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics
         = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
         new TopicPartition("topic", 1),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 0c303d2..6234b0f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -33,12 +33,12 @@ public class SinkNodeTest {
     private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
     private final RecordCollector recordCollector = new MockRecordCollector();
     private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector);
-    private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName",
+    private final SinkNode<byte[], byte[], ?, ?> sink = new SinkNode<>("anyNodeName",
             new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null);
 
     // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed
     @SuppressWarnings("unchecked")
-    private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink;
+    private final SinkNode<Object, Object, ?, ?> illTypedSink = (SinkNode) sink;
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 028c4fe..f565895 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
 public class SourceNodeTest {
     @Test
     public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
-        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -51,7 +51,7 @@ public class SourceNodeTest {
 
     @Test
     public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
-        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -83,7 +83,7 @@ public class SourceNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
         final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final SourceNode<String, String> node =
+        final SourceNode<String, String, ?, ?> node =
             new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
         node.init(context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index facbf2e..ea20955 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -127,9 +127,9 @@ public class StreamTaskTest {
     private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
     private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
 
-    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
+    private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
         @Override
         public void process(final Integer key, final Integer value) {
             throw new RuntimeException("KABOOM!");
@@ -140,8 +140,8 @@ public class StreamTaskTest {
             throw new RuntimeException("KABOOM!");
         }
     };
-    private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
-    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
+    private final MockProcessorNode<Integer, Integer, ?, ?> processorStreamTime = new MockProcessorNode<>(10L);
+    private final MockProcessorNode<Integer, Integer, ?, ?> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
 
     private final String storeName = "store";
     private final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, false);
@@ -175,8 +175,8 @@ public class StreamTaskTest {
         }
     };
 
-    private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?>> processorNodes,
-                                                           final Map<String, SourceNode<?, ?>> sourcesByTopic,
+    private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+                                                           final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
                                                            final Set<String> repartitionTopics) {
         return new ProcessorTopology(processorNodes,
                                      sourcesByTopic,
@@ -187,8 +187,8 @@ public class StreamTaskTest {
                                      repartitionTopics);
     }
 
-    private static ProcessorTopology withSources(final List<ProcessorNode<?, ?>> processorNodes,
-                                                 final Map<String, SourceNode<?, ?>> sourcesByTopic) {
+    private static ProcessorTopology withSources(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+                                                 final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) {
         return new ProcessorTopology(processorNodes,
                                      sourcesByTopic,
                                      Collections.emptyMap(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 4c782a4..2552fb4 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -298,12 +298,12 @@ public class InternalMockProcessorContext
         if (toInternal.hasTimestamp()) {
             setTime(toInternal.timestamp());
         }
-        final ProcessorNode<?, ?> thisNode = currentNode;
+        final ProcessorNode<?, ?, ?, ?> thisNode = currentNode;
         try {
-            for (final ProcessorNode<?, ?> childNode : thisNode.children()) {
+            for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
                 if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
                     currentNode = childNode;
-                    ((ProcessorNode<Object, Object>) childNode).process(key, value);
+                    ((ProcessorNode<Object, Object, ?, ?>) childNode).process(key, value);
                     toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
                                            // Processors and toInternal might have been modified
                 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 718e5af..9a8b407 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -23,12 +23,12 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
+public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
 
     private static final String NAME = "MOCK-PROCESS-";
     private static final AtomicInteger INDEX = new AtomicInteger(1);
 
-    public final MockProcessor<K, V> mockProcessor;
+    public final MockProcessor<KIn, VIn> mockProcessor;
 
     public boolean closed;
     public boolean initialized;
@@ -38,14 +38,14 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     public MockProcessorNode(final long scheduleInterval, final PunctuationType punctuationType) {
-        this(new MockProcessor<K, V>(punctuationType, scheduleInterval));
+        this(new MockProcessor<>(punctuationType, scheduleInterval));
     }
 
     public MockProcessorNode() {
-        this(new MockProcessor<K, V>());
+        this(new MockProcessor<>());
     }
 
-    private MockProcessorNode(final MockProcessor<K, V> mockProcessor) {
+    private MockProcessorNode(final MockProcessor<KIn, VIn> mockProcessor) {
         super(NAME + INDEX.getAndIncrement(), mockProcessor, Collections.<String>emptySet());
 
         this.mockProcessor = mockProcessor;
@@ -58,7 +58,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     @Override
-    public void process(final K key, final V value) {
+    public void process(final KIn key, final VIn value) {
         processor().process(key, value);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index f582202..5130d46 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MockSourceNode<K, V> extends SourceNode<K, V> {
+public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
 
     private static final String NAME = "MOCK-SOURCE-";
     private static final AtomicInteger INDEX = new AtomicInteger(1);
 
     public int numReceived = 0;
-    public final ArrayList<K> keys = new ArrayList<>();
-    public final ArrayList<V> values = new ArrayList<>();
+    public final ArrayList<KIn> keys = new ArrayList<>();
+    public final ArrayList<VIn> values = new ArrayList<>();
     public boolean initialized;
     public boolean closed;
 
-    public MockSourceNode(final Deserializer<K> keyDeserializer, final Deserializer<V> valDeserializer) {
+    public MockSourceNode(final Deserializer<KIn> keyDeserializer, final Deserializer<VIn> valDeserializer) {
         super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer);
     }
 
     @Override
-    public void process(final K key, final V value) {
+    public void process(final KIn key, final VIn value) {
         this.numReceived++;
         this.keys.add(key);
         this.values.add(value);