You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:57 UTC

[kafka] 02/05: builds

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ddd0f5d0dd801bbcf9b0c7206e67a8dcfd4db7a4
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 14:25:24 2021 -0500

    builds
---
 .../streams/kstream/internals/KTableFilter.java    |  2 +-
 .../kstream/internals/KTableValueGetter.java       |  4 +--
 .../internals/TimestampedCacheFlushListener.java   |  1 +
 .../ForeignJoinSubscriptionProcessorSupplier.java  |  2 +-
 .../SubscriptionStoreReceiveProcessorSupplier.java |  2 +-
 .../internals/AbstractProcessorContext.java        |  2 +-
 .../internals/GlobalProcessorContextImpl.java      |  2 +-
 .../processor/internals/GlobalStateUpdateTask.java |  5 ++--
 .../processor/internals/ProcessorContextImpl.java  |  2 +-
 .../state/internals/MeteredKeyValueStore.java      | 30 +++++++++++++++++-----
 .../state/internals/MeteredSessionStore.java       | 30 +++++++++++++++++-----
 .../state/internals/MeteredWindowStore.java        | 30 +++++++++++++++++-----
 .../streams/kstream/internals/KStreamImplTest.java |  8 +++---
 ...KStreamSessionWindowAggregateProcessorTest.java |  2 +-
 .../kstream/internals/KTableReduceTest.java        |  2 +-
 .../internals/SessionCacheFlushListenerTest.java   |  2 +-
 .../TimestampedCacheFlushListenerTest.java         |  9 ++++---
 .../internals/TimestampedTupleForwarderTest.java   |  9 +++++--
 .../internals/AbstractProcessorContextTest.java    |  2 +-
 .../processor/internals/GlobalStateTaskTest.java   |  6 ++---
 .../processor/internals/ProcessorNodeTest.java     |  8 +++---
 .../internals/ProcessorTopologyFactories.java      |  2 +-
 .../internals/RecordDeserializerTest.java          |  2 +-
 .../processor/internals/RecordQueueTest.java       |  6 +++--
 .../streams/processor/internals/SinkNodeTest.java  |  8 +++---
 .../processor/internals/SourceNodeTest.java        |  8 +++---
 .../processor/internals/StreamTaskTest.java        | 22 ++++++++--------
 .../streams/state/KeyValueStoreTestDriver.java     |  1 +
 .../state/internals/AbstractKeyValueStoreTest.java |  2 ++
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  2 +-
 .../internals/AbstractSessionBytesStoreTest.java   |  3 ++-
 .../internals/AbstractWindowBytesStoreTest.java    |  2 +-
 .../state/internals/CacheFlushListenerStub.java    | 12 +++++++++
 .../CachingInMemoryKeyValueStoreTest.java          |  4 +--
 .../internals/CachingInMemorySessionStoreTest.java | 18 +++++++++++--
 .../CachingPersistentSessionStoreTest.java         | 18 +++++++++++--
 .../CachingPersistentWindowStoreTest.java          |  4 +--
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  3 ++-
 ...geLoggingTimestampedKeyValueBytesStoreTest.java |  3 ++-
 .../CompositeReadOnlyKeyValueStoreTest.java        | 11 ++++++--
 .../state/internals/InMemoryKeyValueStoreTest.java |  1 +
 .../state/internals/InMemoryLRUCacheStoreTest.java |  1 +
 .../state/internals/InMemoryWindowStoreTest.java   |  1 +
 .../state/internals/KeyValueSegmentsTest.java      |  2 +-
 .../MeteredTimestampedWindowStoreTest.java         |  2 +-
 .../state/internals/MeteredWindowStoreTest.java    |  2 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  3 ++-
 .../RocksDBTimeOrderedWindowStoreTest.java         |  2 +-
 .../state/internals/RocksDBWindowStoreTest.java    |  1 +
 .../state/internals/SegmentIteratorTest.java       |  3 ++-
 .../state/internals/TimestampedSegmentsTest.java   |  2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  8 +++---
 .../kafka/test/MockInternalProcessorContext.java   |  2 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |  2 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |  4 +--
 .../apache/kafka/test/NoOpProcessorContext.java    |  2 +-
 56 files changed, 227 insertions(+), 102 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index bbffea6..a23dce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index c939234..12145fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 public interface KTableValueGetter<K, V> {
 
-    void init(ProcessorContext<Void, Void> context);
+    void init(ProcessorContext context);
 
     ValueAndTimestamp<V> get(K key);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 6dbf435..97ef6cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
 class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index fd95105..3c12afa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -63,7 +63,7 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+            final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
             droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                 Thread.currentThread().getName(),
                 internalProcessorContext.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 61fb1c1..98bcd4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -60,7 +60,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
             @Override
             public void init(final ProcessorContext context) {
                 super.init(context);
-                final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+                final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
 
                 droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                     Thread.currentThread().getName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 37ffbdc..79b2d0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-public abstract class AbstractProcessorContext implements InternalProcessorContext<Object, Object> {
+public abstract class AbstractProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, VOut> {
 
     private final TaskId taskId;
     private final String applicationId;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index be3cf55..dbdd6a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -34,7 +34,7 @@ import java.time.Duration;
 
 import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
-public class GlobalProcessorContextImpl extends AbstractProcessorContext {
+public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> {
 
     private final GlobalStateManager stateManager;
     private final Time time;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 6b1378b..e5f591c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -69,7 +69,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
-            final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic);
+            final SourceNode<?, ?> source = topology.source(sourceTopic);
             deserializers.put(
                 sourceTopic,
                 new RecordDeserializer(
@@ -111,7 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
+            ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
@@ -138,6 +138,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void initTopology() {
         for (final ProcessorNode<?, ?, ?, ?> node : this.topology.processors()) {
             processorContext.setCurrentNode(node);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dcae2ab..bd7ece4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -42,7 +42,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDur
 import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
 import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
-public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
+public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
     // the below are null for standby tasks
     private StreamTask streamTask;
     private RecordCollector collector;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 18c44e8..62542e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -173,12 +175,28 @@ public class MeteredKeyValueStore<K, V>
         final KeyValueStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
-                (rawKey, rawNewValue, rawOldValue, timestamp) -> listener.apply(
-                    serdes.keyFrom(rawKey),
-                    rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
-                    rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {
+                    @Override
+                    public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) {
+                        listener.apply(
+                            serdes.keyFrom(rawKey),
+                            rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
+                            rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
+                            timestamp
+                        );
+                    }
+
+                    @Override
+                    public void apply(Record<byte[], Change<byte[]>> record) {
+                        listener.apply(
+                            record.withKey(serdes.keyFrom(record.key()))
+                            .withValue(new Change<>(
+                                record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+                                record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+                            ))
+                        );
+                    }
+                },
                 sendOldValues);
         }
         return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 1fbc8db..d305951 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -144,12 +146,28 @@ public class MeteredSessionStore<K, V>
         final SessionStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
-                (key, newValue, oldValue, timestamp) -> listener.apply(
-                    SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
-                    newValue != null ? serdes.valueFrom(newValue) : null,
-                    oldValue != null ? serdes.valueFrom(oldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {
+                    @Override
+                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                        listener.apply(
+                            SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
+                            newValue != null ? serdes.valueFrom(newValue) : null,
+                            oldValue != null ? serdes.valueFrom(oldValue) : null,
+                            timestamp
+                        );
+                    }
+
+                    @Override
+                    public void apply(Record<byte[], Change<byte[]>> record) {
+                        listener.apply(
+                            record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
+                                  .withValue(new Change<>(
+                                      record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+                                      record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+                                  ))
+                        );
+                    }
+                },
                 sendOldValues);
         }
         return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 91b4387..82f65a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -148,12 +150,28 @@ public class MeteredWindowStore<K, V>
         final WindowStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
-                (key, newValue, oldValue, timestamp) -> listener.apply(
-                    WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
-                    newValue != null ? serdes.valueFrom(newValue) : null,
-                    oldValue != null ? serdes.valueFrom(oldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {
+                    @Override
+                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                        listener.apply(
+                            WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
+                            newValue != null ? serdes.valueFrom(newValue) : null,
+                            oldValue != null ? serdes.valueFrom(oldValue) : null,
+                            timestamp
+                        );
+                    }
+
+                    @Override
+                    public void apply(Record<byte[], Change<byte[]>> record) {
+                        listener.apply(
+                            record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
+                                  .withValue(new Change<>(
+                                      record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+                                      record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+                                  ))
+                        );
+                    }
+                },
                 sendOldValues);
         }
         return false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index ac5db68..7bdcea8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -1525,9 +1525,9 @@ public class KStreamImplTest {
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
 
-        final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
+        final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
+        for (final SourceNode<?, ?> sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertNull(sourceNode.getTimestampExtractor());
             } else {
@@ -1554,9 +1554,9 @@ public class KStreamImplTest {
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
 
-        final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
+        final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
+        for (final SourceNode<?, ?> sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertNull(sourceNode.getTimestampExtractor());
             } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0982337..9ee3347 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -98,7 +98,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     private void setup(final String builtInMetricsVersion, final boolean enableCache) {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime());
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<Object, Object>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index b360151..87d6e87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -36,7 +36,7 @@ public class KTableReduceTest {
 
     @Test
     public void shouldAddAndSubtract() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext();
+        final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>();
 
         final Processor<String, Change<Set<String>>> reduceProcessor =
             new KTableReduce<String, Set<String>>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index b25febf..a826d50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
 public class SessionCacheFlushListenerTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
-        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 38ef5c6..7c1b0e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.junit.Test;
@@ -31,7 +32,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardValueTimestampIfNewValueExists() {
-        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -42,7 +43,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>(context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
             "key",
             ValueAndTimestamp.make("newValue", 42L),
             ValueAndTimestamp.make("oldValue", 21L),
@@ -53,7 +54,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardParameterTimestampIfNewValueIsNull() {
-        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -64,7 +65,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>(context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
             "key",
             null,
             ValueAndTimestamp.make("oldValue", 21L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 52a5fcf..dc2767c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -39,12 +39,17 @@ public class TimestampedTupleForwarderTest {
 
     private void setFlushListener(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
-        final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
+        final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class);
 
         expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
         replay(store);
 
-        new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues);
+        new TimestampedTupleForwarder<>(
+            store,
+            (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null,
+            flushListener,
+            sendOldValues
+        );
 
         verify(store);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index b813422..e4968e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -171,7 +171,7 @@ public class AbstractProcessorContextTest {
         );
     }
 
-    private static class TestProcessorContext extends AbstractProcessorContext {
+    private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> {
         static Properties config;
         static {
             config = getStreamsConfig();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index e4bc600..31be9dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -61,10 +61,10 @@ public class GlobalStateTaskTest {
     private final String topic2 = "t2";
     private final TopicPartition t1 = new TopicPartition(topic1, 1);
     private final TopicPartition t2 = new TopicPartition(topic2, 1);
-    private final MockSourceNode<String, String, ?, ?> sourceOne = new MockSourceNode<>(
+    private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(
         new StringDeserializer(),
         new StringDeserializer());
-    private final MockSourceNode<Integer, Integer, ?, ?>  sourceTwo = new MockSourceNode<>(
+    private final MockSourceNode<Integer, Integer>  sourceTwo = new MockSourceNode<>(
         new IntegerDeserializer(),
         new IntegerDeserializer());
     private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>();
@@ -81,7 +81,7 @@ public class GlobalStateTaskTest {
     @Before
     public void before() {
         final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
-        final Map<String, SourceNode<?, ?, ?, ?>> sourceByTopics = new HashMap<>();
+        final Map<String, SourceNode<?, ?>> sourceByTopics = new HashMap<>();
         sourceByTopics.put(topic1, sourceOne);
         sourceByTopics.put(topic2, sourceTwo);
         final Map<String, String> storeToTopic = new HashMap<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 84bdf51..ef46ab3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -110,8 +110,8 @@ public class ProcessorNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime());
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+        final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
+        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
         node.init(context);
 
         final String threadId = Thread.currentThread().getName();
@@ -196,8 +196,8 @@ public class ProcessorNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+        final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
+        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
index b4cef8a..57e4490 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
@@ -27,7 +27,7 @@ public final class ProcessorTopologyFactories {
 
 
     public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                                         final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
+                                         final Map<String, SourceNode<?, ?>> sourcesByTopic,
                                          final List<StateStore> stateStoresByName,
                                          final Map<String, String> storeToChangelogTopic) {
         return new ProcessorTopology(processorNodes,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 18d17ae..448ceaf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -68,7 +68,7 @@ public class RecordDeserializerTest {
         assertEquals(rawRecord.headers(), record.headers());
     }
 
-    static class TheSourceNode extends SourceNode<Object, Object, Object, Object> {
+    static class TheSourceNode extends SourceNode<Object, Object> {
         private final boolean keyThrowsException;
         private final boolean valueThrowsException;
         private final Object key;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 142e85a..d23311b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -62,11 +62,12 @@ public class RecordQueueTest {
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
 
-    final InternalMockProcessorContext context = new InternalMockProcessorContext(
+    @SuppressWarnings("rawtypes")
+    final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
         StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
         new MockRecordCollector()
     );
-    private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics
+    private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
         = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
         new TopicPartition("topic", 1),
@@ -86,6 +87,7 @@ public class RecordQueueTest {
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
+    @SuppressWarnings("unchecked")
     @Before
     public void before() {
         mockSourceNodeWithMetrics.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 30c7b1b..7e7f7b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -33,13 +33,13 @@ public class SinkNodeTest {
     private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
     private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
     private final RecordCollector recordCollector = new MockRecordCollector();
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector);
-    private final SinkNode<byte[], byte[], ?, ?> sink = new SinkNode<>("anyNodeName",
+    private final InternalMockProcessorContext<Void, Void> context = new InternalMockProcessorContext<>(anyStateSerde, recordCollector);
+    private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName",
             new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null);
 
     // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed
-    @SuppressWarnings("unchecked")
-    private final SinkNode<Object, Object, ?, ?> illTypedSink = (SinkNode) sink;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink;
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 92e4719..00c5647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -44,7 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class SourceNodeTest {
     @Test
     public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
-        final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -52,7 +52,7 @@ public class SourceNodeTest {
 
     @Test
     public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
-        final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -84,8 +84,8 @@ public class SourceNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime());
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final SourceNode<String, String, ?, ?> node =
+        final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
+        final SourceNode<String, String> node =
             new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
         node.init(context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9763c4f..0e0f314 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -137,9 +137,9 @@ public class StreamTaskTest {
     private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
     private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
 
-    private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
+    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
         @Override
         public void process(final Record<Integer, Integer> record) {
             throw new RuntimeException("KABOOM!");
@@ -150,7 +150,7 @@ public class StreamTaskTest {
             throw new RuntimeException("KABOOM!");
         }
     };
-    private final MockSourceNode<Integer, Integer, ?, ?> timeoutSource = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
+    private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
         @Override
         public void process(final Record<Integer, Integer> record) {
             throw new TimeoutException("Kaboom!");
@@ -192,7 +192,7 @@ public class StreamTaskTest {
     };
 
     private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                                                           final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
+                                                           final Map<String, SourceNode<?, ?>> sourcesByTopic,
                                                            final Set<String> repartitionTopics) {
         return new ProcessorTopology(processorNodes,
                                      sourcesByTopic,
@@ -204,7 +204,7 @@ public class StreamTaskTest {
     }
 
     private static ProcessorTopology withSources(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                                                 final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) {
+                                                 final Map<String, SourceNode<?, ?>> sourcesByTopic) {
         return new ProcessorTopology(processorNodes,
                                      sourcesByTopic,
                                      emptyMap(),
@@ -622,11 +622,11 @@ public class StreamTaskTest {
         metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
 
         // Create a processor that only forwards even keys to test the metrics at the source and terminal nodes
-        final MockSourceNode<Integer, Integer, Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer, Integer, Integer>(intDeserializer, intDeserializer) {
-            InternalProcessorContext context;
+        final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
+            InternalProcessorContext<Integer, Integer> context;
 
             @Override
-            public void init(final InternalProcessorContext context) {
+            public void init(final InternalProcessorContext<Integer, Integer> context) {
                 this.context = context;
                 super.init(context);
             }
@@ -1528,6 +1528,7 @@ public class StreamTaskTest {
         assertFalse(checkpointFile.exists());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
@@ -1568,6 +1569,7 @@ public class StreamTaskTest {
         assertThrows(IllegalStateException.class, () -> task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { }));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
         task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
@@ -2480,7 +2482,7 @@ public class StreamTaskTest {
         );
     }
 
-    private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer, Integer, Integer> sourceNode) {
+    private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer> sourceNode) {
         final ProcessorTopology topology = withSources(
             asList(sourceNode, processorStreamTime),
             singletonMap(topic1, sourceNode)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 38e9860..3f438f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -187,6 +187,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final InternalMockProcessorContext context;
     private final StateSerdes<K, V> stateSerdes;
 
+    @SuppressWarnings("unchecked")
     private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 5959a80..e050a21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -49,6 +49,7 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("unchecked")
 public abstract class AbstractKeyValueStoreTest {
 
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
@@ -80,6 +81,7 @@ public abstract class AbstractKeyValueStoreTest {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldNotIncludeDeletedFromRangeResult() {
         store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index b103e98..da1050a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -124,7 +124,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         bytesStore = getBytesStore();
 
         stateDir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             stateDir,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index ed60837..b005905 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -89,7 +89,7 @@ public abstract class AbstractSessionBytesStoreTest {
     public void setUp() {
         sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
         recordCollector = new MockRecordCollector();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
@@ -536,6 +536,7 @@ public abstract class AbstractSessionBytesStoreTest {
         assertFalse(iterator.hasNext());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldRestore() {
         final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 08fcb6d..2f9432a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -101,7 +101,7 @@ public abstract class AbstractWindowBytesStoreTest {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
 
         recordCollector = new MockRecordCollector();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             baseDir,
             Serdes.String(),
             Serdes.Integer(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
index ea4b147..fba59cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.api.Record;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -46,4 +47,15 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[],
             )
         );
     }
+
+    @Override
+    public void apply(Record<byte[], Change<byte[]>> record) {
+        forwarded.put(
+            keyDeserializer.deserialize(null, record.key()),
+            new Change<>(
+                valueDeserializer.deserialize(null, record.value().newValue),
+                valueDeserializer.deserialize(null, record.value().oldValue)
+            )
+        );
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 66b13c1..ff78642 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -74,7 +74,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         store = new CachingKeyValueStore(underlyingStore);
         store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(null, null, null, null, cache);
+        context = new InternalMockProcessorContext<>(null, null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         store.init((StateStoreContext) context, null);
     }
@@ -200,7 +200,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         EasyMock.replay(underlyingStore);
         store = new CachingKeyValueStore(underlyingStore);
         cache = EasyMock.niceMock(ThreadCache.class);
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         store.init((StateStoreContext) context, store);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index e584e2c..5885a59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -87,7 +88,7 @@ public class CachingInMemorySessionStoreTest {
         underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope");
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -223,7 +224,7 @@ public class CachingInMemorySessionStoreTest {
         EasyMock.replay(underlyingStore);
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = EasyMock.niceMock(ThreadCache.class);
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        final InternalMockProcessorContext context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -752,5 +753,18 @@ public class CachingInMemorySessionStoreTest {
                         valueDesializer.deserialize(null, oldValue)),
                     timestamp));
         }
+
+        @Override
+        public void apply(Record<byte[], Change<byte[]>> record) {
+            forwarded.add(
+                new KeyValueTimestamp<>(
+                    keyDeserializer.deserialize(null, record.key()),
+                    new Change<>(
+                        valueDesializer.deserialize(null, record.value().newValue),
+                        valueDesializer.deserialize(null, record.value().oldValue)),
+                    record.timestamp()
+                )
+            );
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 7f8a394..224c8bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -92,7 +93,7 @@ public class CachingPersistentSessionStoreTest {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         final InternalMockProcessorContext context =
-            new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+            new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -209,7 +210,7 @@ public class CachingPersistentSessionStoreTest {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = EasyMock.niceMock(ThreadCache.class);
         final InternalMockProcessorContext context =
-            new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+            new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -763,5 +764,18 @@ public class CachingPersistentSessionStoreTest {
                 )
             );
         }
+
+        @Override
+        public void apply(Record<byte[], Change<byte[]>> record) {
+            forwarded.add(
+                new KeyValueTimestamp<>(
+                    keyDeserializer.deserialize(null, record.key()),
+                    new Change<>(
+                        valueDesializer.deserialize(null, record.value().newValue),
+                        valueDesializer.deserialize(null, record.value().oldValue)),
+                    record.timestamp()
+                )
+            );
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 13f1f2b..e434c21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -103,7 +103,7 @@ public class CachingPersistentWindowStoreTest {
         cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -906,7 +906,7 @@ public class CachingPersistentWindowStoreTest {
         EasyMock.replay(underlyingStore);
         cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
         cache = EasyMock.createNiceMock(ThreadCache.class);
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 255994c..9e2532d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@SuppressWarnings("ALL")
 public class ChangeLoggingKeyValueBytesStoreTest {
 
     private final MockRecordCollector collector = new MockRecordCollector();
@@ -64,7 +65,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     }
 
     private InternalMockProcessorContext mockContext() {
-        return new InternalMockProcessorContext(
+        return new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index 8295f7d..d65d948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -42,6 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@SuppressWarnings("rawtypes")
 public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
 
     private final MockRecordCollector collector = new MockRecordCollector();
@@ -64,7 +65,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
     }
 
     private InternalMockProcessorContext mockContext() {
-        return new InternalMockProcessorContext(
+        return new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 736721a..ca8468b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -76,8 +76,15 @@ public class CompositeReadOnlyKeyValueStoreTest {
             Serdes.String())
             .build();
 
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName),
-            Serdes.String(), Serdes.String()), new MockRecordCollector());
+        @SuppressWarnings("rawtypes") final InternalMockProcessorContext context =
+            new InternalMockProcessorContext<>(
+                new StateSerdes<>(
+                    ProcessorStateManager.storeChangelogTopic("appId", storeName),
+                    Serdes.String(),
+                    Serdes.String()
+                ),
+                new MockRecordCollector()
+            );
         context.setTime(1L);
 
         store.init((StateStoreContext) context, store);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 831e684..87a2063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -80,6 +80,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
         return store;
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldRemoveKeysWithNullValues() {
         store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index a044eda..53057b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -132,6 +132,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
         assertEquals(3, driver.numFlushedEntryRemoved());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRestoreEvict() {
         store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 2ef9bad..2a9b5bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -67,6 +67,7 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldRestore() {
         // should be empty initially
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index aeef8ce..c8f1a0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -55,7 +55,7 @@ public class KeyValueSegmentsTest {
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             stateDirectory,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index c05c1ba..4266751 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -87,7 +87,7 @@ public class MeteredTimestampedWindowStoreTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
 
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 538c8d3..b18919a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -132,7 +132,7 @@ public class MeteredWindowStoreTest {
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime());
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index c9e9a8d..14166be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -87,6 +87,7 @@ import static org.junit.Assert.assertTrue;
 import static org.powermock.api.easymock.PowerMock.replay;
 import static org.powermock.api.easymock.PowerMock.verify;
 
+@SuppressWarnings("unchecked")
 public class RocksDBStoreTest {
     private static boolean enableBloomFilters = false;
     final static String DB_NAME = "db-name";
@@ -107,7 +108,7 @@ public class RocksDBStoreTest {
         final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         dir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             dir,
             Serdes.String(),
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
index 2646c4c..95c88ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
@@ -59,7 +59,7 @@ public class RocksDBTimeOrderedWindowStoreTest {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
 
         recordCollector = new MockRecordCollector();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             baseDir,
             Serdes.String(),
             Serdes.Integer(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 5643cde..fff4ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -491,6 +491,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
         );
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRestore() throws Exception {
         final long startTime = SEGMENT_INTERVAL * 2;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 97593e7..c7e5924 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -53,9 +53,10 @@ public class SegmentIteratorTest {
 
     private SegmentIterator<KeyValueSegment> iterator = null;
 
+    @SuppressWarnings("rawtypes")
     @Before
     public void before() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index 558f1c9..722cb69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -55,7 +55,7 @@ public class TimestampedSegmentsTest {
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             stateDirectory,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 8d39bf3..b19fdf5 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -59,8 +59,8 @@ import java.util.Map;
 
 import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
-public class InternalMockProcessorContext
-    extends AbstractProcessorContext
+public class InternalMockProcessorContext<KOut, VOut>
+    extends AbstractProcessorContext<KOut, VOut>
     implements RecordCollector.Supplier {
 
     private StateManager stateManager = new StateManagerStub();
@@ -290,13 +290,13 @@ public class InternalMockProcessorContext
     public void commit() {}
 
     @Override
-    public <K, V> void forward(final Record<K, V> record) {
+    public <K extends KOut, V extends VOut> void forward(final Record<K, V> record) {
         forward(record, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final Record<K, V> record, final String childName) {
+    public <K extends KOut, V extends VOut> void forward(final Record<K, V> record, final String childName) {
         if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
             setTime(record.timestamp());
         }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 370dca7..c32c136 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -40,7 +40,7 @@ import java.util.Optional;
 import java.util.Properties;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
-public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
 
     private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
     private ProcessorNode currentNode;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index a75c250..4ab4cb8b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -53,7 +53,7 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KOut, VOut> context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 9d22e3b..f52134e 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
+public class MockSourceNode<KIn, VIn> extends SourceNode<KIn, VIn> {
 
     private static final String NAME = "MOCK-SOURCE-";
     private static final AtomicInteger INDEX = new AtomicInteger(1);
@@ -47,7 +47,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KIn, VIn> context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index a3ec02b..b3243cd 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -44,7 +44,7 @@ import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
-public class NoOpProcessorContext extends AbstractProcessorContext {
+public class NoOpProcessorContext extends AbstractProcessorContext<Object, Object> {
     public boolean initialized;
     @SuppressWarnings("WeakerAccess")
     public Map<Object, Object> forwardedValues = new HashMap<>();