You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:55 UTC

[kafka] branch poc-478-ktable-1 created (now b4f09ab)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at b4f09ab  fix tests

This branch includes the following new commits:

     new 8707d5c  wip
     new ddd0f5d  builds
     new 06640c7  fix processorSupplier
     new 8264273  fixing tests
     new b4f09ab  fix tests

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[kafka] 01/05: wip

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8707d5c5b1b1806f1c92101665c8935fb43245f1
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 13:19:35 2021 -0500

    wip
---
 .../streams/kstream/internals/KTableFilter.java    | 74 ++++++++++++----------
 .../streams/kstream/internals/KTableImpl.java      | 20 +++++-
 .../internals/KTableNewProcessorSupplier.java      | 40 ++++++++++++
 .../kstream/internals/KTableValueGetter.java       |  4 +-
 ....java => NewTimestampedCacheFlushListener.java} | 29 +++++----
 .../internals/SessionCacheFlushListener.java       | 29 +++++++--
 .../internals/TimestampedCacheFlushListener.java   | 43 ++++++++-----
 .../internals/TimestampedTupleForwarder.java       | 42 +++++++-----
 .../internals/AbstractProcessorContext.java        |  2 +-
 .../internals/InternalProcessorContext.java        |  4 +-
 .../internals/InternalTopologyBuilder.java         | 51 ++++++++-------
 .../streams/processor/internals/ProcessorNode.java |  5 +-
 .../processor/internals/ProcessorTopology.java     | 32 +++++-----
 .../processor/internals/RecordDeserializer.java    |  6 +-
 .../streams/processor/internals/RecordQueue.java   |  6 +-
 .../streams/processor/internals/SinkNode.java      |  8 +--
 .../streams/processor/internals/SourceNode.java    |  6 +-
 .../streams/processor/internals/StreamTask.java    | 11 +++-
 .../state/internals/CacheFlushListener.java        |  8 +++
 19 files changed, 269 insertions(+), 151 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 76753a4..bbffea6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -17,23 +17,23 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
-    private final KTableImpl<K, ?, V> parent;
-    private final Predicate<? super K, ? super V> predicate;
+class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> {
+    private final KTableImpl<KIn, ?, VIn> parent;
+    private final Predicate<? super KIn, ? super VIn> predicate;
     private final boolean filterNot;
     private final String queryableName;
     private boolean sendOldValues;
 
-    KTableFilter(final KTableImpl<K, ?, V> parent,
-                 final Predicate<? super K, ? super V> predicate,
+    KTableFilter(final KTableImpl<KIn, ?, VIn> parent,
+                 final Predicate<? super KIn, ? super VIn> predicate,
                  final boolean filterNot,
                  final String queryableName) {
         this.parent = parent;
@@ -45,7 +45,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public Processor<K, Change<V>> get() {
+    public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() {
         return new KTableFilterProcessor();
     }
 
@@ -62,8 +62,8 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         return sendOldValues;
     }
 
-    private V computeValue(final K key, final V value) {
-        V newValue = null;
+    private VIn computeValue(final KIn key, final VIn value) {
+        VIn newValue = null;
 
         if (value != null && (filterNot ^ predicate.test(key, value))) {
             newValue = value;
@@ -72,11 +72,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         return newValue;
     }
 
-    private ValueAndTimestamp<V> computeValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
-        ValueAndTimestamp<V> newValueAndTimestamp = null;
+    private ValueAndTimestamp<VIn> computeValue(final KIn key, final ValueAndTimestamp<VIn> valueAndTimestamp) {
+        ValueAndTimestamp<VIn> newValueAndTimestamp = null;
 
         if (valueAndTimestamp != null) {
-            final V value = valueAndTimestamp.value();
+            final VIn value = valueAndTimestamp.value();
             if (filterNot ^ predicate.test(key, value)) {
                 newValueAndTimestamp = valueAndTimestamp;
             }
@@ -86,13 +86,14 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
 
-    private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
-        private TimestampedKeyValueStore<K, V> store;
-        private TimestampedTupleForwarder<K, V> tupleForwarder;
+    private class KTableFilterProcessor implements Processor<KIn, Change<VIn>, KIn, Change<VIn>> {
+        private ProcessorContext<KIn, Change<VIn>> context;
+        private TimestampedKeyValueStore<KIn, VIn> store;
+        private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;
 
         @Override
-        public void init(final ProcessorContext context) {
-            super.init(context);
+        public void init(final ProcessorContext<KIn, Change<VIn>> context) {
+            this.context = context;
             if (queryableName != null) {
                 store = context.getStateStore(queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
@@ -104,23 +105,26 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         }
 
         @Override
-        public void process(final K key, final Change<V> change) {
-            final V newValue = computeValue(key, change.newValue);
-            final V oldValue = computeOldValue(key, change);
+        public void process(Record<KIn, Change<VIn>> record) {
+            final KIn key = record.key();
+            final Change<VIn> change = record.value();
+
+            final VIn newValue = computeValue(key, change.newValue);
+            final VIn oldValue = computeOldValue(key, change);
 
             if (sendOldValues && oldValue == null && newValue == null) {
                 return; // unnecessary to forward here.
             }
 
             if (queryableName != null) {
-                store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
-                tupleForwarder.maybeForward(key, newValue, oldValue);
+                store.put(key, ValueAndTimestamp.make(newValue, record.timestamp()));
+                tupleForwarder.maybeForward(record.withValue(new Change<>(newValue, oldValue)));
             } else {
-                context().forward(key, new Change<>(newValue, oldValue));
+                context.forward(record.withValue(new Change<>(newValue, oldValue)));
             }
         }
 
-        private V computeOldValue(final K key, final Change<V> change) {
+        private VIn computeOldValue(final KIn key, final Change<VIn> change) {
             if (!sendOldValues) {
                 return null;
             }
@@ -132,16 +136,16 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
+    public KTableValueGetterSupplier<KIn, VIn> view() {
         // if the KTable is materialized, use the materialized store to return getter value;
         // otherwise rely on the parent getter and apply filter on-the-fly
         if (queryableName != null) {
             return new KTableMaterializedValueGetterSupplier<>(queryableName);
         } else {
-            return new KTableValueGetterSupplier<K, V>() {
-                final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
+            return new KTableValueGetterSupplier<KIn, VIn>() {
+                final KTableValueGetterSupplier<KIn, VIn> parentValueGetterSupplier = parent.valueGetterSupplier();
 
-                public KTableValueGetter<K, V> get() {
+                public KTableValueGetter<KIn, VIn> get() {
                     return new KTableFilterValueGetter(parentValueGetterSupplier.get());
                 }
 
@@ -154,20 +158,20 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     }
 
 
-    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
-        private final KTableValueGetter<K, V> parentGetter;
+    private class KTableFilterValueGetter implements KTableValueGetter<KIn, VIn> {
+        private final KTableValueGetter<KIn, VIn> parentGetter;
 
-        KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) {
+        KTableFilterValueGetter(final KTableValueGetter<KIn, VIn> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final ProcessorContext<Void, Void> context) {
             parentGetter.init(context);
         }
 
         @Override
-        public ValueAndTimestamp<V> get(final K key) {
+        public ValueAndTimestamp<VIn> get(final KIn key) {
             return computeValue(key, parentGetter.get(key));
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 52f7b5f..a2b8702 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -125,6 +125,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     private static final String SINK_NAME = "KTABLE-SINK-";
 
     private final ProcessorSupplier<?, ?> processorSupplier;
+    private final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier;
 
     private final String queryableStoreName;
 
@@ -140,6 +141,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
         this.processorSupplier = processorSupplier;
+        this.newProcessorSupplier = null;
+        this.queryableStoreName = queryableStoreName;
+    }
+
+    public KTableImpl(final String name,
+                      final Serde<K> keySerde,
+                      final Serde<V> valueSerde,
+                      final Set<String> subTopologySourceNodes,
+                      final String queryableStoreName,
+                      final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier,
+                      final GraphNode graphNode,
+                      final InternalStreamsBuilder builder) {
+        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
+        this.processorSupplier = null;
+        this.newProcessorSupplier = newProcessorSupplier;
         this.queryableStoreName = queryableStoreName;
     }
 
@@ -179,7 +195,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         }
         final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
 
-        final KTableProcessorSupplier<K, V, V> processorSupplier =
+        final KTableNewProcessorSupplier<K, V, K, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
         final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
@@ -194,7 +210,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
         builder.addGraphNode(this.graphNode, tableNode);
 
-        return new KTableImpl<>(
+        return new KTableImpl<K, V, V>(
             name,
             keySerde,
             valueSerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
new file mode 100644
index 0000000..ef8ea30
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+
+public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {
+
+    KTableValueGetterSupplier<KOut, VOut> view();
+
+    /**
+     * Potentially enables sending old values.
+     * <p>
+     * If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to
+     * enable sending old values.
+     * <p>
+     * If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values <i>if</i>
+     * an upstream node is already materialized.
+     *
+     * @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old
+     *                             values.
+     * @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was
+     * {@code true} or some upstream node is materialized.
+     */
+    boolean enableSendingOldValues(boolean forceMaterialization);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index 12145fa..c939234 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 public interface KTableValueGetter<K, V> {
 
-    void init(ProcessorContext context);
+    void init(ProcessorContext<Void, Void> context);
 
     ValueAndTimestamp<V> get(K key);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
similarity index 61%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
index f40fdfe..d325459 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
@@ -16,31 +16,34 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class NewTimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
+    private final InternalProcessorContext<KOut, Change<VOut>> context;
+
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
-    SessionCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+    NewTimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Windowed<K> key,
-                      final V newValue,
-                      final V oldValue,
-                      final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+        throw new RuntimeException("ASDFASDF");
+    }
+
+    @Override
+    public void apply(Record<KOut, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
+            context.forward(record);
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index f40fdfe..ceff4b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -19,25 +19,29 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Windowed<KOut>, VOut> {
+    private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> context;
+
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
+    @SuppressWarnings("unchecked")
     SessionCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+        this.context = (InternalProcessorContext<Windowed<KOut>, Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Windowed<K> key,
-                      final V newValue,
-                      final V oldValue,
+    public void apply(final Windowed<KOut> key,
+                      final VOut newValue,
+                      final VOut oldValue,
                       final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
             context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
@@ -45,4 +49,15 @@ class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>,
             context.setCurrentNode(prev);
         }
     }
+
+    @Override
+    public void apply(Record<Windowed<KOut>, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            context.forward(record);
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 5540376..6dbf435 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -16,36 +16,47 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
+    private final InternalProcessorContext<KOut, Change<VOut>> context;
 
-class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
-    private final InternalProcessorContext context;
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
-    TimestampedCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+    TimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
+    @SuppressWarnings("unchecked")
+    TimestampedCacheFlushListener(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
+        myNode = this.context.currentNode();
+    }
+
+    @Override
+    public void apply(Record<KOut, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            context.forward(record);
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }
+
     @Override
-    public void apply(final K key,
-                      final ValueAndTimestamp<V> newValue,
-                      final ValueAndTimestamp<V> oldValue,
-                      final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(
-                key,
-                new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
-                To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
+            context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 910dd8f..729e9fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 /**
@@ -30,34 +32,42 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  * @param <V> the type of the value
  */
 class TimestampedTupleForwarder<K, V> {
-    private final ProcessorContext context;
-    private final boolean sendOldValues;
+    private final InternalProcessorContext<K, Change<V>> context;
     private final boolean cachingEnabled;
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     TimestampedTupleForwarder(final StateStore store,
-                              final ProcessorContext context,
+                              final ProcessorContext<K, Change<V>> context,
                               final TimestampedCacheFlushListener<K, V> flushListener,
                               final boolean sendOldValues) {
-        this.context = context;
-        this.sendOldValues = sendOldValues;
+        this.context = (InternalProcessorContext<K, Change<V>>) context;
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
-    public void maybeForward(final K key,
-                             final V newValue,
-                             final V oldValue) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TimestampedTupleForwarder(final StateStore store,
+                              final org.apache.kafka.streams.processor.ProcessorContext context,
+                              final TimestampedCacheFlushListener<K, V> flushListener,
+                              final boolean sendOldValues) {
+        this.context = (InternalProcessorContext) context;
+        cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
+    }
+
+    public void maybeForward(final Record<K, Change<V>> record) {
+        if (!cachingEnabled) {
+            context.forward(record);
+        }
+    }
+
+    public void maybeForward(K key, V value, V oldValue) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
+            context.forward(key, new Change<>(value, oldValue));
         }
     }
 
-    public void maybeForward(final K key,
-                             final V newValue,
-                             final V oldValue,
-                             final long timestamp) {
+    public void maybeForward(K key, V value, V oldValue, long newTimestamp) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp));
+            context.forward(key, new Change<>(value, oldValue), To.all().withTimestamp(newTimestamp));
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 09a2e31..37ffbdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-public abstract class AbstractProcessorContext implements InternalProcessorContext {
+public abstract class AbstractProcessorContext implements InternalProcessorContext<Object, Object> {
 
     private final TaskId taskId;
     private final String applicationId;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index ba5c580..88e47e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -34,8 +34,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext
-    extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext {
+public interface InternalProcessorContext<KOut, VOut>
+    extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<KOut, VOut>, StateStoreContext {
 
     BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
     ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 838cff9..4a54f01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -243,7 +243,7 @@ public class InternalTopologyBuilder {
     // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
     private final Map<String, Pattern> topicToPatterns = new HashMap<>();
 
-    private class SourceNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
+    private class SourceNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, KIn, VIn> {
         private final List<String> topics;
         private final Pattern pattern;
         private final Deserializer<KIn> keyDeserializer;
@@ -291,7 +291,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
+        public ProcessorNode<KIn, VIn, KIn, VIn> build() {
             return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);
         }
 
@@ -305,7 +305,7 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private class SinkNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
+    private class SinkNodeFactory<KIn, VIn> extends NodeFactory<KIn, VIn, Void, Void> {
         private final Serializer<KIn> keySerializer;
         private final Serializer<VIn> valSerializer;
         private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
@@ -325,7 +325,7 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
+        public ProcessorNode<KIn, VIn, Void, Void> build() {
             if (topicExtractor instanceof StaticTopicNameExtractor) {
                 final String topic = ((StaticTopicNameExtractor<KIn, VIn>) topicExtractor).topicName;
                 if (internalTopicNamesWithProperties.containsKey(topic)) {
@@ -761,12 +761,12 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private Set<SourceNodeFactory<?, ?, ?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
-        final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodes = new HashSet<>();
+    private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
+        final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
         for (final String predecessor : predecessors) {
             final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(predecessor);
             if (nodeFactory instanceof SourceNodeFactory) {
-                sourceNodes.add((SourceNodeFactory<?, ?, ?, ?>) nodeFactory);
+                sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
                 sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory).predecessors));
             }
@@ -787,10 +787,10 @@ public class InternalTopologyBuilder {
 
         final Set<String> sourceTopics = new HashSet<>();
         final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodesForPredecessor =
+        final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
             findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
 
-        for (final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
+        for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
             if (sourceNodeFactory.pattern != null) {
                 sourcePatterns.add(sourceNodeFactory.pattern);
             } else {
@@ -925,8 +925,8 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(applicationId, "topology has not completed optimization");
 
         final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>();
-        final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
+        final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
+        final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
         final Set<String> repartitionTopics = new HashSet<>();
 
@@ -946,15 +946,15 @@ public class InternalTopologyBuilder {
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
                                     repartitionTopics,
-                                    (SourceNodeFactory<?, ?, ?, ?>) factory,
-                                    (SourceNode<?, ?, ?, ?>) node);
+                                    (SourceNodeFactory<?, ?>) factory,
+                                    (SourceNode<?, ?>) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
                                   repartitionTopics,
-                                  (SinkNodeFactory<?, ?, ?, ?>) factory,
-                                  (SinkNode<Object, Object, ?, ?>) node);
+                                  (SinkNodeFactory<?, ?>) factory,
+                                  (SinkNode<?, ?>) node);
                 } else {
                     throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
@@ -971,13 +971,16 @@ public class InternalTopologyBuilder {
     }
 
     private void buildSinkNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
-                               final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap,
+                               final Map<String, SinkNode<?, ?>> topicSinkMap,
                                final Set<String> repartitionTopics,
-                               final SinkNodeFactory<?, ?, ?, ?> sinkNodeFactory,
-                               final SinkNode<Object, Object, ?, ?> node) {
+                               final SinkNodeFactory<?, ?> sinkNodeFactory,
+                               final SinkNode<?, ?> node) {
+        @SuppressWarnings("unchecked") final ProcessorNode<Object, Object, ?, ?> sinkNode =
+            (ProcessorNode<Object, Object, ?, ?>) node;
 
         for (final String predecessorName : sinkNodeFactory.predecessors) {
-            getProcessor(processorMap, predecessorName).addChild(node);
+            final ProcessorNode<Object, Object, Object, Object> processor = getProcessor(processorMap, predecessorName);
+            processor.addChild(sinkNode);
             if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) {
                 final String topic = ((StaticTopicNameExtractor<?, ?>) sinkNodeFactory.topicExtractor).topicName;
 
@@ -1002,10 +1005,10 @@ public class InternalTopologyBuilder {
         return (ProcessorNode<KIn, VIn, KOut, VOut>) processorMap.get(predecessor);
     }
 
-    private void buildSourceNode(final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap,
+    private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap,
                                  final Set<String> repartitionTopics,
-                                 final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory,
-                                 final SourceNode<?, ?, ?, ?> node) {
+                                 final SourceNodeFactory<?, ?> sourceNodeFactory,
+                                 final SourceNode<?, ?> node) {
 
         final List<String> topics = (sourceNodeFactory.pattern != null) ?
             sourceNodeFactory.getTopics(subscriptionUpdates()) :
@@ -1168,7 +1171,7 @@ public class InternalTopologyBuilder {
     private void setRegexMatchedTopicsToSourceNodes() {
         if (hasSubscriptionUpdates()) {
             for (final String nodeName : nodeToSourcePatterns.keySet()) {
-                final SourceNodeFactory<?, ?, ?, ?> sourceNode = (SourceNodeFactory<?, ?, ?, ?>) nodeFactories.get(nodeName);
+                final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
                 final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
                 //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
                 nodeToSourceTopics.put(nodeName, sourceTopics);
@@ -1358,7 +1361,7 @@ public class InternalTopologyBuilder {
         final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(nodeName);
 
         if (nodeFactory instanceof SourceNodeFactory) {
-            final List<String> topics = ((SourceNodeFactory<?, ?, ?, ?>) nodeFactory).topics;
+            final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics;
             return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0));
         }
         return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index a8c32c7..80fdc4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -105,8 +105,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         childByName.put(child.name, child);
     }
 
-    @SuppressWarnings("unchecked")
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KOut, VOut> context) {
         if (!closed)
             throw new IllegalStateException("The processor is not closed");
 
@@ -116,7 +115,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init((ProcessorContext<KOut, VOut>) context);
+                        processor.init(context);
                     }
                 },
                 time,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 0a0118a..8f1a460 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -33,9 +33,9 @@ public class ProcessorTopology {
     private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class);
 
     private final List<ProcessorNode<?, ?, ?, ?>> processorNodes;
-    private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByName;
-    private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic;
-    private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic;
+    private final Map<String, SourceNode<?, ?>> sourceNodesByName;
+    private final Map<String, SourceNode<?, ?>> sourceNodesByTopic;
+    private final Map<String, SinkNode<?, ?>> sinksByTopic;
     private final Set<String> terminalNodes;
     private final List<StateStore> stateStores;
     private final Set<String> stateStoreNames;
@@ -46,8 +46,8 @@ public class ProcessorTopology {
     private final Map<String, String> storeToChangelogTopic;
 
     public ProcessorTopology(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                             final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic,
-                             final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic,
+                             final Map<String, SourceNode<?, ?>> sourceNodesByTopic,
+                             final Map<String, SinkNode<?, ?>> sinksByTopic,
                              final List<StateStore> stateStores,
                              final List<StateStore> globalStateStores,
                              final Map<String, String> storeToChangelogTopic,
@@ -69,7 +69,7 @@ public class ProcessorTopology {
         }
 
         this.sourceNodesByName = new HashMap<>();
-        for (final SourceNode<?, ?, ?, ?> source : sourceNodesByTopic.values()) {
+        for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) {
             sourceNodesByName.put(source.name(), source);
         }
     }
@@ -78,11 +78,11 @@ public class ProcessorTopology {
         return sourceNodesByTopic.keySet();
     }
 
-    public SourceNode<?, ?, ?, ?> source(final String topic) {
+    public SourceNode<?, ?> source(final String topic) {
         return sourceNodesByTopic.get(topic);
     }
 
-    public Set<SourceNode<?, ?, ?, ?>> sources() {
+    public Set<SourceNode<?, ?>> sources() {
         return new HashSet<>(sourceNodesByTopic.values());
     }
 
@@ -90,7 +90,7 @@ public class ProcessorTopology {
         return sinksByTopic.keySet();
     }
 
-    public SinkNode<?, ?, ?, ?> sink(final String topic) {
+    public SinkNode<?, ?> sink(final String topic) {
         return sinksByTopic.get(topic);
     }
 
@@ -151,9 +151,9 @@ public class ProcessorTopology {
 
     public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {
         sourceNodesByTopic.clear();
-        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {
             final String sourceNodeName = sourceNodeEntry.getKey();
-            final SourceNode<?, ?, ?, ?> sourceNode = sourceNodeEntry.getValue();
+            final SourceNode<?, ?> sourceNode = sourceNodeEntry.getValue();
 
             final List<String> updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName);
             if (updatedSourceTopics == null) {
@@ -211,10 +211,10 @@ public class ProcessorTopology {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
-        final Map<SourceNode<?, ?, ?, ?>, List<String>> sourceToTopics = new HashMap<>();
-        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) {
             final String topic = sourceNodeEntry.getKey();
-            final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getValue();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
             sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
             sourceToTopics.get(source).add(topic);
         }
@@ -222,8 +222,8 @@ public class ProcessorTopology {
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (final Map.Entry<SourceNode<?, ?, ?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
-            final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getKey();
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
+            final SourceNode<?, ?> source = sourceNodeEntry.getKey();
             final List<String> topics = sourceNodeEntry.getValue();
             sb.append(source.toString(indent + "\t"))
                 .append(topicsToString(indent + "\t", topics))
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 20f1449..a965187 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -31,11 +31,11 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXC
 
 class RecordDeserializer {
     private final Logger log;
-    private final SourceNode<?, ?, ?, ?> sourceNode;
+    private final SourceNode<?, ?> sourceNode;
     private final Sensor droppedRecordsSensor;
     private final DeserializationExceptionHandler deserializationExceptionHandler;
 
-    RecordDeserializer(final SourceNode<?, ?, ?, ?> sourceNode,
+    RecordDeserializer(final SourceNode<?, ?> sourceNode,
                        final DeserializationExceptionHandler deserializationExceptionHandler,
                        final LogContext logContext,
                        final Sensor droppedRecordsSensor) {
@@ -100,7 +100,7 @@ class RecordDeserializer {
         }
     }
 
-    SourceNode<?, ?, ?, ?> sourceNode() {
+    SourceNode<?, ?> sourceNode() {
         return sourceNode;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index df7e834..6f0db8a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -39,7 +39,7 @@ public class RecordQueue {
     public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
 
     private final Logger log;
-    private final SourceNode<?, ?, ?, ?> source;
+    private final SourceNode<?, ?> source;
     private final TopicPartition partition;
     private final ProcessorContext processorContext;
     private final TimestampExtractor timestampExtractor;
@@ -52,7 +52,7 @@ public class RecordQueue {
     private final Sensor droppedRecordsSensor;
 
     RecordQueue(final TopicPartition partition,
-                final SourceNode<?, ?, ?, ?> source,
+                final SourceNode<?, ?> source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler deserializationExceptionHandler,
                 final InternalProcessorContext processorContext,
@@ -85,7 +85,7 @@ public class RecordQueue {
      *
      * @return SourceNode
      */
-    public SourceNode<?, ?, ?, ?> source() {
+    public SourceNode<?, ?> source() {
         return source;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2efa537..9091f3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -24,14 +24,14 @@ import org.apache.kafka.streams.processor.api.Record;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
 
-public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
+public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {
 
     private Serializer<KIn> keySerializer;
     private Serializer<VIn> valSerializer;
     private final TopicNameExtractor<KIn, VIn> topicExtractor;
     private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
 
-    private InternalProcessorContext context;
+    private InternalProcessorContext<Void, Void> context;
 
     SinkNode(final String name,
              final TopicNameExtractor<KIn, VIn> topicExtractor,
@@ -50,12 +50,12 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut
      * @throws UnsupportedOperationException if this method adds a child to a sink node
      */
     @Override
-    public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) {
+    public void addChild(final ProcessorNode<Void, Void, ?, ?> child) {
         throw new UnsupportedOperationException("sink node does not allow addChild");
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<Void, Void> context) {
         super.init(context);
         this.context = context;
         final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 7198f2f..9ff3473 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -26,9 +26,9 @@ import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer;
 import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer;
 
-public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
+public class SourceNode<KIn, VIn> extends ProcessorNode<KIn, VIn, KIn, VIn> {
 
-    private InternalProcessorContext context;
+    private InternalProcessorContext<KIn, VIn> context;
     private Deserializer<KIn> keyDeserializer;
     private Deserializer<VIn> valDeserializer;
     private final TimestampExtractor timestampExtractor;
@@ -59,7 +59,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KIn, VIn> context) {
         // It is important to first create the sensor before calling init on the
         // parent object. Otherwise due to backwards compatibility an empty sensor
         // without parent is created with the same name.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 129407a..ee5d1ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -102,7 +102,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private final Sensor punctuateLatencySensor;
     private final Sensor bufferedRecordsSensor;
     private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
+
+    @SuppressWarnings("rawtypes")
     private final InternalProcessorContext processorContext;
+
     private final RecordQueueCreator recordQueueCreator;
 
     private StampedRecord record;
@@ -110,6 +113,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private boolean commitRequested = false;
     private boolean hasPendingTxCommit = false;
 
+    @SuppressWarnings("rawtypes")
     public StreamTask(final TaskId id,
                       final Set<TopicPartition> inputPartitions,
                       final ProcessorTopology topology,
@@ -317,6 +321,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void closeTopology() {
         log.trace("Closing processor topology");
 
@@ -805,6 +810,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      * @throws IllegalStateException if the current node is not null
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
+    @SuppressWarnings("unchecked")
     @Override
     public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
                           final long timestamp,
@@ -840,6 +846,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
                                         final long wallClockTime,
                                         final ProcessorRecordContext recordContext) {
@@ -933,6 +940,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return purgeableConsumedOffsets;
     }
 
+    @SuppressWarnings("unchecked")
     private void initializeTopology() {
         // initialize the task by initializing all its processor nodes in the topology
         log.trace("Initializing processor nodes of the topology");
@@ -1107,6 +1115,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
     }
 
+    @SuppressWarnings("rawtypes")
     public InternalProcessorContext processorContext() {
         return processorContext;
     }
@@ -1230,7 +1239,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         }
 
         public RecordQueue createQueue(final TopicPartition partition) {
-            final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic());
+            final SourceNode<?, ?> source = topology.source(partition.topic());
             if (source == null) {
                 throw new TopologyException(
                         "Topic is unknown to the topology. " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
index 7e5f11a..c86d216 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.api.Record;
+
 /**
  * Listen to cache flush events
  * @param <K> key type
@@ -31,4 +34,9 @@ public interface CacheFlushListener<K, V> {
      * @param timestamp   timestamp of new value
      */
     void apply(final K key, final V newValue, final V oldValue, final long timestamp);
+
+    /**
+     * Called when records are flushed from the {@link ThreadCache}
+     */
+    void apply(final Record<K, Change<V>> record);
 }

[kafka] 04/05: fixing tests

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 82642732d400465084f42029e54ec703e2242318
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 15:32:31 2021 -0500

    fixing tests
---
 .../streams/kstream/internals/KTableFilter.java    |  4 +-
 .../streams/kstream/internals/KTableImpl.java      |  8 ++++
 .../NewTimestampedCacheFlushListener.java          | 51 ----------------------
 .../internals/SessionCacheFlushListener.java       |  2 +-
 .../internals/TimestampedCacheFlushListener.java   | 27 +++++++++---
 .../internals/TimestampedTupleForwarder.java       | 22 +++++++---
 .../org/apache/kafka/streams/processor/To.java     |  7 +++
 .../streams/processor/internals/ProcessorNode.java |  1 -
 .../state/internals/MeteredKeyValueStore.java      |  4 +-
 .../state/internals/MeteredSessionStore.java       |  4 +-
 .../state/internals/MeteredWindowStore.java        |  4 +-
 .../internals/SessionCacheFlushListenerTest.java   |  2 +-
 .../TimestampedCacheFlushListenerTest.java         |  8 ++--
 .../internals/TimestampedTupleForwarderTest.java   | 23 +++++++---
 .../state/internals/CacheFlushListenerStub.java    |  2 +-
 .../internals/CachingInMemorySessionStoreTest.java |  2 +-
 .../CachingPersistentSessionStoreTest.java         |  2 +-
 17 files changed, 87 insertions(+), 86 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index a23dce4..ecbd4703 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -105,7 +105,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
         }
 
         @Override
-        public void process(Record<KIn, Change<VIn>> record) {
+        public void process(final Record<KIn, Change<VIn>> record) {
             final KIn key = record.key();
             final Change<VIn> change = record.value();
 
@@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
         }
 
         @Override
-        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9733511..01cd194 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -832,6 +832,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             return new KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
+        } else if (processorSupplier instanceof KTableNewProcessorSupplier){
+            return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
         } else {
             return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
         }
@@ -848,6 +850,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
+            } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
+                final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier =
+                    (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier;
+                if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
+                    return false;
+                }
             } else {
                 final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
                 if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
deleted file mode 100644
index d325459..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-
-class NewTimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
-    private final InternalProcessorContext<KOut, Change<VOut>> context;
-
-    @SuppressWarnings("rawtypes")
-    private final ProcessorNode myNode;
-
-    NewTimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) {
-        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
-        myNode = this.context.currentNode();
-    }
-
-    @Override
-    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
-        throw new RuntimeException("ASDFASDF");
-    }
-
-    @Override
-    public void apply(Record<KOut, Change<VOut>> record) {
-        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
-        context.setCurrentNode(myNode);
-        try {
-            context.forward(record);
-        } finally {
-            context.setCurrentNode(prev);
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index ceff4b7..2792dd9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -51,7 +51,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window
     }
 
     @Override
-    public void apply(Record<Windowed<KOut>, Change<VOut>> record) {
+    public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 97ef6cb..4034414 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -24,7 +24,9 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, ValueAndTimestamp<VOut>> {
     private final InternalProcessorContext<KOut, Change<VOut>> context;
 
     @SuppressWarnings("rawtypes")
@@ -42,22 +44,35 @@ class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KO
     }
 
     @Override
-    public void apply(Record<KOut, Change<VOut>> record) {
-        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
+    public void apply(final KOut key,
+                      final ValueAndTimestamp<VOut> newValue,
+                      final ValueAndTimestamp<VOut> oldValue,
+                      final long timestamp) {
+        final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(record);
+            context.forward(
+                key,
+                new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
+                To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
         } finally {
             context.setCurrentNode(prev);
         }
     }
 
     @Override
-    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+    public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> record) {
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
+            context.forward(
+                record.withValue(
+                    new Change<>(
+                        getValueOrNull(record.value().newValue),
+                        getValueOrNull(record.value().oldValue)
+                    )
+                )
+            );
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 729e9fd..e5733eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  */
 class TimestampedTupleForwarder<K, V> {
     private final InternalProcessorContext<K, Change<V>> context;
+    private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -41,6 +42,7 @@ class TimestampedTupleForwarder<K, V> {
                               final TimestampedCacheFlushListener<K, V> flushListener,
                               final boolean sendOldValues) {
         this.context = (InternalProcessorContext<K, Change<V>>) context;
+        this.sendOldValues = sendOldValues;
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
@@ -50,24 +52,34 @@ class TimestampedTupleForwarder<K, V> {
                               final TimestampedCacheFlushListener<K, V> flushListener,
                               final boolean sendOldValues) {
         this.context = (InternalProcessorContext) context;
+        this.sendOldValues = sendOldValues;
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
     public void maybeForward(final Record<K, Change<V>> record) {
         if (!cachingEnabled) {
-            context.forward(record);
+            if(sendOldValues) {
+                context.forward(record);
+            } else {
+                context.forward(record.withValue(new Change<>(record.value().newValue, null)));
+            }
         }
     }
 
-    public void maybeForward(K key, V value, V oldValue) {
+    public void maybeForward(final K key,
+                             final V newValue,
+                             final V oldValue) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(value, oldValue));
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
         }
     }
 
-    public void maybeForward(K key, V value, V oldValue, long newTimestamp) {
+    public void maybeForward(final K key,
+                             final V newValue,
+                             final V oldValue,
+                             final long timestamp) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(value, oldValue), To.all().withTimestamp(newTimestamp));
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp));
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
index fe19dbf..69c0c5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -89,4 +89,11 @@ public class To {
         throw new UnsupportedOperationException("To is unsafe for use in Hash collections");
     }
 
+    @Override
+    public String toString() {
+        return "To{" +
+               "childName='" + childName + '\'' +
+               ", timestamp=" + timestamp +
+               '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 80fdc4f..f221c57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 62542e6..94624a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -177,7 +177,7 @@ public class MeteredKeyValueStore<K, V>
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
                 new CacheFlushListener<byte[], byte[]>() {
                     @Override
-                    public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) {
+                    public void apply(final byte[] rawKey, final byte[] rawNewValue, final byte[] rawOldValue, final long timestamp) {
                         listener.apply(
                             serdes.keyFrom(rawKey),
                             rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
@@ -187,7 +187,7 @@ public class MeteredKeyValueStore<K, V>
                     }
 
                     @Override
-                    public void apply(Record<byte[], Change<byte[]>> record) {
+                    public void apply(final Record<byte[], Change<byte[]>> record) {
                         listener.apply(
                             record.withKey(serdes.keyFrom(record.key()))
                             .withValue(new Change<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d305951..3b858e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -148,7 +148,7 @@ public class MeteredSessionStore<K, V>
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
                 new CacheFlushListener<byte[], byte[]>() {
                     @Override
-                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                    public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) {
                         listener.apply(
                             SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
                             newValue != null ? serdes.valueFrom(newValue) : null,
@@ -158,7 +158,7 @@ public class MeteredSessionStore<K, V>
                     }
 
                     @Override
-                    public void apply(Record<byte[], Change<byte[]>> record) {
+                    public void apply(final Record<byte[], Change<byte[]>> record) {
                         listener.apply(
                             record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
                                   .withValue(new Change<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 82f65a6..f77bf4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -152,7 +152,7 @@ public class MeteredWindowStore<K, V>
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
                 new CacheFlushListener<byte[], byte[]>() {
                     @Override
-                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                    public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) {
                         listener.apply(
                             WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
                             newValue != null ? serdes.valueFrom(newValue) : null,
@@ -162,7 +162,7 @@ public class MeteredWindowStore<K, V>
                     }
 
                     @Override
-                    public void apply(Record<byte[], Change<byte[]>> record) {
+                    public void apply(final Record<byte[], Change<byte[]>> record) {
                         listener.apply(
                             record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
                                   .withValue(new Change<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index a826d50..c60bcf4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
 public class SessionCacheFlushListenerTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
-        final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<Windowed<String>, Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 7c1b0e7..7c25b2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -32,7 +32,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardValueTimestampIfNewValueExists() {
-        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -43,7 +43,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply(
             "key",
             ValueAndTimestamp.make("newValue", 42L),
             ValueAndTimestamp.make("oldValue", 21L),
@@ -54,7 +54,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardParameterTimestampIfNewValueIsNull() {
-        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -65,7 +65,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply(
             "key",
             null,
             ValueAndTimestamp.make("oldValue", 21L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index dc2767c..8b9dccb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.junit.Test;
@@ -39,14 +40,14 @@ public class TimestampedTupleForwarderTest {
 
     private void setFlushListener(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
-        final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class);
+        final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
 
         expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
         replay(store);
 
         new TimestampedTupleForwarder<>(
             store,
-            (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null,
+            (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<Object>>) null,
             flushListener,
             sendOldValues
         );
@@ -62,7 +63,7 @@ public class TimestampedTupleForwarderTest {
 
     private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext context = mock(ProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
 
         expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
         if (sendOldValues) {
@@ -76,7 +77,12 @@ public class TimestampedTupleForwarderTest {
         replay(store, context);
 
         final TimestampedTupleForwarder<String, String> forwarder =
-            new TimestampedTupleForwarder<>(store, context, null, sendOldValues);
+            new TimestampedTupleForwarder<>(
+                store,
+                (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
+                null,
+                sendOldValues
+            );
         forwarder.maybeForward("key1", "newValue1", "oldValue1");
         forwarder.maybeForward("key2", "newValue2", "oldValue2", 42L);
 
@@ -86,13 +92,18 @@ public class TimestampedTupleForwarderTest {
     @Test
     public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext context = mock(ProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
 
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);
 
         final TimestampedTupleForwarder<String, String> forwarder =
-            new TimestampedTupleForwarder<>(store, context, null, false);
+            new TimestampedTupleForwarder<>(
+                store,
+                (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
+                null,
+                false
+            );
         forwarder.maybeForward("key", "newValue", "oldValue");
         forwarder.maybeForward("key", "newValue", "oldValue", 42L);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
index fba59cf..b214739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
@@ -49,7 +49,7 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[],
     }
 
     @Override
-    public void apply(Record<byte[], Change<byte[]>> record) {
+    public void apply(final Record<byte[], Change<byte[]>> record) {
         forwarded.put(
             keyDeserializer.deserialize(null, record.key()),
             new Change<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index 5885a59..417b35f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -755,7 +755,7 @@ public class CachingInMemorySessionStoreTest {
         }
 
         @Override
-        public void apply(Record<byte[], Change<byte[]>> record) {
+        public void apply(final Record<byte[], Change<byte[]>> record) {
             forwarded.add(
                 new KeyValueTimestamp<>(
                     keyDeserializer.deserialize(null, record.key()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 224c8bd..55018bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -766,7 +766,7 @@ public class CachingPersistentSessionStoreTest {
         }
 
         @Override
-        public void apply(Record<byte[], Change<byte[]>> record) {
+        public void apply(final Record<byte[], Change<byte[]>> record) {
             forwarded.add(
                 new KeyValueTimestamp<>(
                     keyDeserializer.deserialize(null, record.key()),

[kafka] 05/05: fix tests

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b4f09abdec9ebc29ae028cce25b977ac0ed87061
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 16:01:56 2021 -0500

    fix tests
---
 .../java/org/apache/kafka/streams/kstream/internals/KTableFilter.java   | 2 ++
 .../java/org/apache/kafka/streams/kstream/internals/KTableImpl.java     | 2 +-
 .../kafka/streams/kstream/internals/SessionCacheFlushListener.java      | 2 +-
 .../kafka/streams/kstream/internals/TimestampedTupleForwarder.java      | 2 +-
 .../kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java  | 1 -
 .../streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java    | 2 +-
 6 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index ecbd4703..3d97408 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -167,6 +167,8 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
 
         @Override
         public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+            // This is the old processor context for compatibility with the other KTable processors.
+            // Once we migrte them all, we can swap this out.
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 01cd194..faee7af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -832,7 +832,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             return new KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
-        } else if (processorSupplier instanceof KTableNewProcessorSupplier){
+        } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
             return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
         } else {
             return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index 2792dd9..daa7c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -55,7 +55,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(record);
+            context.forward(record.withTimestamp(record.key().window().end()));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index e5733eb..6411b35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -58,7 +58,7 @@ class TimestampedTupleForwarder<K, V> {
 
     public void maybeForward(final Record<K, Change<V>> record) {
         if (!cachingEnabled) {
-            if(sendOldValues) {
+            if (sendOldValues) {
                 context.forward(record);
             } else {
                 context.forward(record.withValue(new Change<>(record.value().newValue, null)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 8b9dccb..89b732e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 9e2532d..5f524fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -46,7 +46,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-@SuppressWarnings("ALL")
+@SuppressWarnings("rawtypes")
 public class ChangeLoggingKeyValueBytesStoreTest {
 
     private final MockRecordCollector collector = new MockRecordCollector();

[kafka] 02/05: builds

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ddd0f5d0dd801bbcf9b0c7206e67a8dcfd4db7a4
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 14:25:24 2021 -0500

    builds
---
 .../streams/kstream/internals/KTableFilter.java    |  2 +-
 .../kstream/internals/KTableValueGetter.java       |  4 +--
 .../internals/TimestampedCacheFlushListener.java   |  1 +
 .../ForeignJoinSubscriptionProcessorSupplier.java  |  2 +-
 .../SubscriptionStoreReceiveProcessorSupplier.java |  2 +-
 .../internals/AbstractProcessorContext.java        |  2 +-
 .../internals/GlobalProcessorContextImpl.java      |  2 +-
 .../processor/internals/GlobalStateUpdateTask.java |  5 ++--
 .../processor/internals/ProcessorContextImpl.java  |  2 +-
 .../state/internals/MeteredKeyValueStore.java      | 30 +++++++++++++++++-----
 .../state/internals/MeteredSessionStore.java       | 30 +++++++++++++++++-----
 .../state/internals/MeteredWindowStore.java        | 30 +++++++++++++++++-----
 .../streams/kstream/internals/KStreamImplTest.java |  8 +++---
 ...KStreamSessionWindowAggregateProcessorTest.java |  2 +-
 .../kstream/internals/KTableReduceTest.java        |  2 +-
 .../internals/SessionCacheFlushListenerTest.java   |  2 +-
 .../TimestampedCacheFlushListenerTest.java         |  9 ++++---
 .../internals/TimestampedTupleForwarderTest.java   |  9 +++++--
 .../internals/AbstractProcessorContextTest.java    |  2 +-
 .../processor/internals/GlobalStateTaskTest.java   |  6 ++---
 .../processor/internals/ProcessorNodeTest.java     |  8 +++---
 .../internals/ProcessorTopologyFactories.java      |  2 +-
 .../internals/RecordDeserializerTest.java          |  2 +-
 .../processor/internals/RecordQueueTest.java       |  6 +++--
 .../streams/processor/internals/SinkNodeTest.java  |  8 +++---
 .../processor/internals/SourceNodeTest.java        |  8 +++---
 .../processor/internals/StreamTaskTest.java        | 22 ++++++++--------
 .../streams/state/KeyValueStoreTestDriver.java     |  1 +
 .../state/internals/AbstractKeyValueStoreTest.java |  2 ++
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  2 +-
 .../internals/AbstractSessionBytesStoreTest.java   |  3 ++-
 .../internals/AbstractWindowBytesStoreTest.java    |  2 +-
 .../state/internals/CacheFlushListenerStub.java    | 12 +++++++++
 .../CachingInMemoryKeyValueStoreTest.java          |  4 +--
 .../internals/CachingInMemorySessionStoreTest.java | 18 +++++++++++--
 .../CachingPersistentSessionStoreTest.java         | 18 +++++++++++--
 .../CachingPersistentWindowStoreTest.java          |  4 +--
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  3 ++-
 ...geLoggingTimestampedKeyValueBytesStoreTest.java |  3 ++-
 .../CompositeReadOnlyKeyValueStoreTest.java        | 11 ++++++--
 .../state/internals/InMemoryKeyValueStoreTest.java |  1 +
 .../state/internals/InMemoryLRUCacheStoreTest.java |  1 +
 .../state/internals/InMemoryWindowStoreTest.java   |  1 +
 .../state/internals/KeyValueSegmentsTest.java      |  2 +-
 .../MeteredTimestampedWindowStoreTest.java         |  2 +-
 .../state/internals/MeteredWindowStoreTest.java    |  2 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  3 ++-
 .../RocksDBTimeOrderedWindowStoreTest.java         |  2 +-
 .../state/internals/RocksDBWindowStoreTest.java    |  1 +
 .../state/internals/SegmentIteratorTest.java       |  3 ++-
 .../state/internals/TimestampedSegmentsTest.java   |  2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  8 +++---
 .../kafka/test/MockInternalProcessorContext.java   |  2 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |  2 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |  4 +--
 .../apache/kafka/test/NoOpProcessorContext.java    |  2 +-
 56 files changed, 227 insertions(+), 102 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index bbffea6..a23dce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
         }
 
         @Override
-        public void init(final ProcessorContext<Void, Void> context) {
+        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index c939234..12145fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 public interface KTableValueGetter<K, V> {
 
-    void init(ProcessorContext<Void, Void> context);
+    void init(ProcessorContext context);
 
     ValueAndTimestamp<V> get(K key);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 6dbf435..97ef6cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
 class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index fd95105..3c12afa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -63,7 +63,7 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+            final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
             droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                 Thread.currentThread().getName(),
                 internalProcessorContext.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 61fb1c1..98bcd4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -60,7 +60,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
             @Override
             public void init(final ProcessorContext context) {
                 super.init(context);
-                final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+                final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
 
                 droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
                     Thread.currentThread().getName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 37ffbdc..79b2d0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-public abstract class AbstractProcessorContext implements InternalProcessorContext<Object, Object> {
+public abstract class AbstractProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, VOut> {
 
     private final TaskId taskId;
     private final String applicationId;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index be3cf55..dbdd6a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -34,7 +34,7 @@ import java.time.Duration;
 
 import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
-public class GlobalProcessorContextImpl extends AbstractProcessorContext {
+public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> {
 
     private final GlobalStateManager stateManager;
     private final Time time;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 6b1378b..e5f591c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -69,7 +69,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
-            final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic);
+            final SourceNode<?, ?> source = topology.source(sourceTopic);
             deserializers.put(
                 sourceTopic,
                 new RecordDeserializer(
@@ -111,7 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
+            ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
         }
 
         offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
@@ -138,6 +138,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void initTopology() {
         for (final ProcessorNode<?, ?, ?, ?> node : this.topology.processors()) {
             processorContext.setCurrentNode(node);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dcae2ab..bd7ece4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -42,7 +42,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDur
 import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
 import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
-public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
+public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
     // the below are null for standby tasks
     private StreamTask streamTask;
     private RecordCollector collector;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 18c44e8..62542e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -173,12 +175,28 @@ public class MeteredKeyValueStore<K, V>
         final KeyValueStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
-                (rawKey, rawNewValue, rawOldValue, timestamp) -> listener.apply(
-                    serdes.keyFrom(rawKey),
-                    rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
-                    rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {
+                    @Override
+                    public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) {
+                        listener.apply(
+                            serdes.keyFrom(rawKey),
+                            rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
+                            rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
+                            timestamp
+                        );
+                    }
+
+                    @Override
+                    public void apply(Record<byte[], Change<byte[]>> record) {
+                        listener.apply(
+                            record.withKey(serdes.keyFrom(record.key()))
+                            .withValue(new Change<>(
+                                record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+                                record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+                            ))
+                        );
+                    }
+                },
                 sendOldValues);
         }
         return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 1fbc8db..d305951 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -144,12 +146,28 @@ public class MeteredSessionStore<K, V>
         final SessionStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
-                (key, newValue, oldValue, timestamp) -> listener.apply(
-                    SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
-                    newValue != null ? serdes.valueFrom(newValue) : null,
-                    oldValue != null ? serdes.valueFrom(oldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {
+                    @Override
+                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                        listener.apply(
+                            SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
+                            newValue != null ? serdes.valueFrom(newValue) : null,
+                            oldValue != null ? serdes.valueFrom(oldValue) : null,
+                            timestamp
+                        );
+                    }
+
+                    @Override
+                    public void apply(Record<byte[], Change<byte[]>> record) {
+                        listener.apply(
+                            record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
+                                  .withValue(new Change<>(
+                                      record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+                                      record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+                                  ))
+                        );
+                    }
+                },
                 sendOldValues);
         }
         return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 91b4387..82f65a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -148,12 +150,28 @@ public class MeteredWindowStore<K, V>
         final WindowStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
-                (key, newValue, oldValue, timestamp) -> listener.apply(
-                    WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
-                    newValue != null ? serdes.valueFrom(newValue) : null,
-                    oldValue != null ? serdes.valueFrom(oldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {
+                    @Override
+                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                        listener.apply(
+                            WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
+                            newValue != null ? serdes.valueFrom(newValue) : null,
+                            oldValue != null ? serdes.valueFrom(oldValue) : null,
+                            timestamp
+                        );
+                    }
+
+                    @Override
+                    public void apply(Record<byte[], Change<byte[]>> record) {
+                        listener.apply(
+                            record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
+                                  .withValue(new Change<>(
+                                      record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+                                      record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+                                  ))
+                        );
+                    }
+                },
                 sendOldValues);
         }
         return false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index ac5db68..7bdcea8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -1525,9 +1525,9 @@ public class KStreamImplTest {
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
 
-        final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
+        final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
+        for (final SourceNode<?, ?> sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertNull(sourceNode.getTimestampExtractor());
             } else {
@@ -1554,9 +1554,9 @@ public class KStreamImplTest {
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
 
-        final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
+        final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
+        for (final SourceNode<?, ?> sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
                 assertNull(sourceNode.getTimestampExtractor());
             } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0982337..9ee3347 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -98,7 +98,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     private void setup(final String builtInMetricsVersion, final boolean enableCache) {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime());
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<Object, Object>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index b360151..87d6e87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -36,7 +36,7 @@ public class KTableReduceTest {
 
     @Test
     public void shouldAddAndSubtract() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext();
+        final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>();
 
         final Processor<String, Change<Set<String>>> reduceProcessor =
             new KTableReduce<String, Set<String>>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index b25febf..a826d50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
 public class SessionCacheFlushListenerTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
-        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 38ef5c6..7c1b0e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.junit.Test;
@@ -31,7 +32,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardValueTimestampIfNewValueExists() {
-        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -42,7 +43,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>(context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
             "key",
             ValueAndTimestamp.make("newValue", 42L),
             ValueAndTimestamp.make("oldValue", 21L),
@@ -53,7 +54,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardParameterTimestampIfNewValueIsNull() {
-        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -64,7 +65,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>(context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
             "key",
             null,
             ValueAndTimestamp.make("oldValue", 21L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 52a5fcf..dc2767c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -39,12 +39,17 @@ public class TimestampedTupleForwarderTest {
 
     private void setFlushListener(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
-        final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
+        final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class);
 
         expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
         replay(store);
 
-        new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues);
+        new TimestampedTupleForwarder<>(
+            store,
+            (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null,
+            flushListener,
+            sendOldValues
+        );
 
         verify(store);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index b813422..e4968e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -171,7 +171,7 @@ public class AbstractProcessorContextTest {
         );
     }
 
-    private static class TestProcessorContext extends AbstractProcessorContext {
+    private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> {
         static Properties config;
         static {
             config = getStreamsConfig();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index e4bc600..31be9dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -61,10 +61,10 @@ public class GlobalStateTaskTest {
     private final String topic2 = "t2";
     private final TopicPartition t1 = new TopicPartition(topic1, 1);
     private final TopicPartition t2 = new TopicPartition(topic2, 1);
-    private final MockSourceNode<String, String, ?, ?> sourceOne = new MockSourceNode<>(
+    private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(
         new StringDeserializer(),
         new StringDeserializer());
-    private final MockSourceNode<Integer, Integer, ?, ?>  sourceTwo = new MockSourceNode<>(
+    private final MockSourceNode<Integer, Integer>  sourceTwo = new MockSourceNode<>(
         new IntegerDeserializer(),
         new IntegerDeserializer());
     private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>();
@@ -81,7 +81,7 @@ public class GlobalStateTaskTest {
     @Before
     public void before() {
         final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
-        final Map<String, SourceNode<?, ?, ?, ?>> sourceByTopics = new HashMap<>();
+        final Map<String, SourceNode<?, ?>> sourceByTopics = new HashMap<>();
         sourceByTopics.put(topic1, sourceOne);
         sourceByTopics.put(topic2, sourceTwo);
         final Map<String, String> storeToTopic = new HashMap<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 84bdf51..ef46ab3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -110,8 +110,8 @@ public class ProcessorNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime());
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+        final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
+        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
         node.init(context);
 
         final String threadId = Thread.currentThread().getName();
@@ -196,8 +196,8 @@ public class ProcessorNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+        final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
+        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
index b4cef8a..57e4490 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
@@ -27,7 +27,7 @@ public final class ProcessorTopologyFactories {
 
 
     public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                                         final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
+                                         final Map<String, SourceNode<?, ?>> sourcesByTopic,
                                          final List<StateStore> stateStoresByName,
                                          final Map<String, String> storeToChangelogTopic) {
         return new ProcessorTopology(processorNodes,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 18d17ae..448ceaf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -68,7 +68,7 @@ public class RecordDeserializerTest {
         assertEquals(rawRecord.headers(), record.headers());
     }
 
-    static class TheSourceNode extends SourceNode<Object, Object, Object, Object> {
+    static class TheSourceNode extends SourceNode<Object, Object> {
         private final boolean keyThrowsException;
         private final boolean valueThrowsException;
         private final Object key;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 142e85a..d23311b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -62,11 +62,12 @@ public class RecordQueueTest {
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
 
-    final InternalMockProcessorContext context = new InternalMockProcessorContext(
+    @SuppressWarnings("rawtypes")
+    final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
         StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
         new MockRecordCollector()
     );
-    private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics
+    private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
         = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
         new TopicPartition("topic", 1),
@@ -86,6 +87,7 @@ public class RecordQueueTest {
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
+    @SuppressWarnings("unchecked")
     @Before
     public void before() {
         mockSourceNodeWithMetrics.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 30c7b1b..7e7f7b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -33,13 +33,13 @@ public class SinkNodeTest {
     private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
     private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
     private final RecordCollector recordCollector = new MockRecordCollector();
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector);
-    private final SinkNode<byte[], byte[], ?, ?> sink = new SinkNode<>("anyNodeName",
+    private final InternalMockProcessorContext<Void, Void> context = new InternalMockProcessorContext<>(anyStateSerde, recordCollector);
+    private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName",
             new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null);
 
     // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed
-    @SuppressWarnings("unchecked")
-    private final SinkNode<Object, Object, ?, ?> illTypedSink = (SinkNode) sink;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink;
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 92e4719..00c5647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -44,7 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class SourceNodeTest {
     @Test
     public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
-        final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -52,7 +52,7 @@ public class SourceNodeTest {
 
     @Test
     public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
-        final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -84,8 +84,8 @@ public class SourceNodeTest {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime());
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
-        final SourceNode<String, String, ?, ?> node =
+        final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
+        final SourceNode<String, String> node =
             new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
         node.init(context);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9763c4f..0e0f314 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -137,9 +137,9 @@ public class StreamTaskTest {
     private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
     private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
 
-    private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
+    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
         @Override
         public void process(final Record<Integer, Integer> record) {
             throw new RuntimeException("KABOOM!");
@@ -150,7 +150,7 @@ public class StreamTaskTest {
             throw new RuntimeException("KABOOM!");
         }
     };
-    private final MockSourceNode<Integer, Integer, ?, ?> timeoutSource = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
+    private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
         @Override
         public void process(final Record<Integer, Integer> record) {
             throw new TimeoutException("Kaboom!");
@@ -192,7 +192,7 @@ public class StreamTaskTest {
     };
 
     private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                                                           final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
+                                                           final Map<String, SourceNode<?, ?>> sourcesByTopic,
                                                            final Set<String> repartitionTopics) {
         return new ProcessorTopology(processorNodes,
                                      sourcesByTopic,
@@ -204,7 +204,7 @@ public class StreamTaskTest {
     }
 
     private static ProcessorTopology withSources(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
-                                                 final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) {
+                                                 final Map<String, SourceNode<?, ?>> sourcesByTopic) {
         return new ProcessorTopology(processorNodes,
                                      sourcesByTopic,
                                      emptyMap(),
@@ -622,11 +622,11 @@ public class StreamTaskTest {
         metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
 
         // Create a processor that only forwards even keys to test the metrics at the source and terminal nodes
-        final MockSourceNode<Integer, Integer, Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer, Integer, Integer>(intDeserializer, intDeserializer) {
-            InternalProcessorContext context;
+        final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
+            InternalProcessorContext<Integer, Integer> context;
 
             @Override
-            public void init(final InternalProcessorContext context) {
+            public void init(final InternalProcessorContext<Integer, Integer> context) {
                 this.context = context;
                 super.init(context);
             }
@@ -1528,6 +1528,7 @@ public class StreamTaskTest {
         assertFalse(checkpointFile.exists());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
@@ -1568,6 +1569,7 @@ public class StreamTaskTest {
         assertThrows(IllegalStateException.class, () -> task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { }));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
         task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
@@ -2480,7 +2482,7 @@ public class StreamTaskTest {
         );
     }
 
-    private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer, Integer, Integer> sourceNode) {
+    private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer> sourceNode) {
         final ProcessorTopology topology = withSources(
             asList(sourceNode, processorStreamTime),
             singletonMap(topic1, sourceNode)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 38e9860..3f438f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -187,6 +187,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final InternalMockProcessorContext context;
     private final StateSerdes<K, V> stateSerdes;
 
+    @SuppressWarnings("unchecked")
     private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 5959a80..e050a21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -49,6 +49,7 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("unchecked")
 public abstract class AbstractKeyValueStoreTest {
 
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
@@ -80,6 +81,7 @@ public abstract class AbstractKeyValueStoreTest {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldNotIncludeDeletedFromRangeResult() {
         store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index b103e98..da1050a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -124,7 +124,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         bytesStore = getBytesStore();
 
         stateDir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             stateDir,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index ed60837..b005905 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -89,7 +89,7 @@ public abstract class AbstractSessionBytesStoreTest {
     public void setUp() {
         sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
         recordCollector = new MockRecordCollector();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
@@ -536,6 +536,7 @@ public abstract class AbstractSessionBytesStoreTest {
         assertFalse(iterator.hasNext());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldRestore() {
         final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 08fcb6d..2f9432a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -101,7 +101,7 @@ public abstract class AbstractWindowBytesStoreTest {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
 
         recordCollector = new MockRecordCollector();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             baseDir,
             Serdes.String(),
             Serdes.Integer(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
index ea4b147..fba59cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.api.Record;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -46,4 +47,15 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[],
             )
         );
     }
+
+    @Override
+    public void apply(Record<byte[], Change<byte[]>> record) {
+        forwarded.put(
+            keyDeserializer.deserialize(null, record.key()),
+            new Change<>(
+                valueDeserializer.deserialize(null, record.value().newValue),
+                valueDeserializer.deserialize(null, record.value().oldValue)
+            )
+        );
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 66b13c1..ff78642 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -74,7 +74,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         store = new CachingKeyValueStore(underlyingStore);
         store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(null, null, null, null, cache);
+        context = new InternalMockProcessorContext<>(null, null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         store.init((StateStoreContext) context, null);
     }
@@ -200,7 +200,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         EasyMock.replay(underlyingStore);
         store = new CachingKeyValueStore(underlyingStore);
         cache = EasyMock.niceMock(ThreadCache.class);
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         store.init((StateStoreContext) context, store);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index e584e2c..5885a59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -87,7 +88,7 @@ public class CachingInMemorySessionStoreTest {
         underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope");
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -223,7 +224,7 @@ public class CachingInMemorySessionStoreTest {
         EasyMock.replay(underlyingStore);
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = EasyMock.niceMock(ThreadCache.class);
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        final InternalMockProcessorContext context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -752,5 +753,18 @@ public class CachingInMemorySessionStoreTest {
                         valueDesializer.deserialize(null, oldValue)),
                     timestamp));
         }
+
+        @Override
+        public void apply(Record<byte[], Change<byte[]>> record) {
+            forwarded.add(
+                new KeyValueTimestamp<>(
+                    keyDeserializer.deserialize(null, record.key()),
+                    new Change<>(
+                        valueDesializer.deserialize(null, record.value().newValue),
+                        valueDesializer.deserialize(null, record.value().oldValue)),
+                    record.timestamp()
+                )
+            );
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 7f8a394..224c8bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -92,7 +93,7 @@ public class CachingPersistentSessionStoreTest {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         final InternalMockProcessorContext context =
-            new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+            new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -209,7 +210,7 @@ public class CachingPersistentSessionStoreTest {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = EasyMock.niceMock(ThreadCache.class);
         final InternalMockProcessorContext context =
-            new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+            new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -763,5 +764,18 @@ public class CachingPersistentSessionStoreTest {
                 )
             );
         }
+
+        @Override
+        public void apply(Record<byte[], Change<byte[]>> record) {
+            forwarded.add(
+                new KeyValueTimestamp<>(
+                    keyDeserializer.deserialize(null, record.key()),
+                    new Change<>(
+                        valueDesializer.deserialize(null, record.value().newValue),
+                        valueDesializer.deserialize(null, record.value().oldValue)),
+                    record.timestamp()
+                )
+            );
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 13f1f2b..e434c21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -103,7 +103,7 @@ public class CachingPersistentWindowStoreTest {
         cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -906,7 +906,7 @@ public class CachingPersistentWindowStoreTest {
         EasyMock.replay(underlyingStore);
         cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
         cache = EasyMock.createNiceMock(ThreadCache.class);
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 255994c..9e2532d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@SuppressWarnings("ALL")
 public class ChangeLoggingKeyValueBytesStoreTest {
 
     private final MockRecordCollector collector = new MockRecordCollector();
@@ -64,7 +65,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     }
 
     private InternalMockProcessorContext mockContext() {
-        return new InternalMockProcessorContext(
+        return new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index 8295f7d..d65d948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -42,6 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@SuppressWarnings("rawtypes")
 public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
 
     private final MockRecordCollector collector = new MockRecordCollector();
@@ -64,7 +65,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
     }
 
     private InternalMockProcessorContext mockContext() {
-        return new InternalMockProcessorContext(
+        return new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 736721a..ca8468b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -76,8 +76,15 @@ public class CompositeReadOnlyKeyValueStoreTest {
             Serdes.String())
             .build();
 
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName),
-            Serdes.String(), Serdes.String()), new MockRecordCollector());
+        @SuppressWarnings("rawtypes") final InternalMockProcessorContext context =
+            new InternalMockProcessorContext<>(
+                new StateSerdes<>(
+                    ProcessorStateManager.storeChangelogTopic("appId", storeName),
+                    Serdes.String(),
+                    Serdes.String()
+                ),
+                new MockRecordCollector()
+            );
         context.setTime(1L);
 
         store.init((StateStoreContext) context, store);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 831e684..87a2063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -80,6 +80,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
         return store;
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldRemoveKeysWithNullValues() {
         store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index a044eda..53057b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -132,6 +132,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
         assertEquals(3, driver.numFlushedEntryRemoved());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRestoreEvict() {
         store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 2ef9bad..2a9b5bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -67,6 +67,7 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldRestore() {
         // should be empty initially
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index aeef8ce..c8f1a0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -55,7 +55,7 @@ public class KeyValueSegmentsTest {
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             stateDirectory,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index c05c1ba..4266751 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -87,7 +87,7 @@ public class MeteredTimestampedWindowStoreTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
 
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 538c8d3..b18919a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -132,7 +132,7 @@ public class MeteredWindowStoreTest {
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime());
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index c9e9a8d..14166be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -87,6 +87,7 @@ import static org.junit.Assert.assertTrue;
 import static org.powermock.api.easymock.PowerMock.replay;
 import static org.powermock.api.easymock.PowerMock.verify;
 
+@SuppressWarnings("unchecked")
 public class RocksDBStoreTest {
     private static boolean enableBloomFilters = false;
     final static String DB_NAME = "db-name";
@@ -107,7 +108,7 @@ public class RocksDBStoreTest {
         final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         dir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             dir,
             Serdes.String(),
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
index 2646c4c..95c88ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
@@ -59,7 +59,7 @@ public class RocksDBTimeOrderedWindowStoreTest {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
 
         recordCollector = new MockRecordCollector();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             baseDir,
             Serdes.String(),
             Serdes.Integer(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 5643cde..fff4ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -491,6 +491,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
         );
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRestore() throws Exception {
         final long startTime = SEGMENT_INTERVAL * 2;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 97593e7..c7e5924 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -53,9 +53,10 @@ public class SegmentIteratorTest {
 
     private SegmentIterator<KeyValueSegment> iterator = null;
 
+    @SuppressWarnings("rawtypes")
     @Before
     public void before() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index 558f1c9..722cb69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -55,7 +55,7 @@ public class TimestampedSegmentsTest {
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(
+        context = new InternalMockProcessorContext<>(
             stateDirectory,
             Serdes.String(),
             Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 8d39bf3..b19fdf5 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -59,8 +59,8 @@ import java.util.Map;
 
 import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
-public class InternalMockProcessorContext
-    extends AbstractProcessorContext
+public class InternalMockProcessorContext<KOut, VOut>
+    extends AbstractProcessorContext<KOut, VOut>
     implements RecordCollector.Supplier {
 
     private StateManager stateManager = new StateManagerStub();
@@ -290,13 +290,13 @@ public class InternalMockProcessorContext
     public void commit() {}
 
     @Override
-    public <K, V> void forward(final Record<K, V> record) {
+    public <K extends KOut, V extends VOut> void forward(final Record<K, V> record) {
         forward(record, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final Record<K, V> record, final String childName) {
+    public <K extends KOut, V extends VOut> void forward(final Record<K, V> record, final String childName) {
         if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
             setTime(record.timestamp());
         }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 370dca7..c32c136 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -40,7 +40,7 @@ import java.util.Optional;
 import java.util.Properties;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
-public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
 
     private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
     private ProcessorNode currentNode;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index a75c250..4ab4cb8b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -53,7 +53,7 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KOut, VOut> context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 9d22e3b..f52134e 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
+public class MockSourceNode<KIn, VIn> extends SourceNode<KIn, VIn> {
 
     private static final String NAME = "MOCK-SOURCE-";
     private static final AtomicInteger INDEX = new AtomicInteger(1);
@@ -47,7 +47,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
     }
 
     @Override
-    public void init(final InternalProcessorContext context) {
+    public void init(final InternalProcessorContext<KIn, VIn> context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index a3ec02b..b3243cd 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -44,7 +44,7 @@ import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
-public class NoOpProcessorContext extends AbstractProcessorContext {
+public class NoOpProcessorContext extends AbstractProcessorContext<Object, Object> {
     public boolean initialized;
     @SuppressWarnings("WeakerAccess")
     public Map<Object, Object> forwardedValues = new HashMap<>();

[kafka] 03/05: fix processorSupplier

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 06640c7b4b91cb4799664452aeb05d7ee71d8dc7
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 14:28:13 2021 -0500

    fix processorSupplier
---
 .../org/apache/kafka/streams/kstream/internals/KTableImpl.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a2b8702..9733511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -124,8 +124,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     private static final String TOPIC_SUFFIX = "-topic";
     private static final String SINK_NAME = "KTABLE-SINK-";
 
-    private final ProcessorSupplier<?, ?> processorSupplier;
-    private final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier;
+    // Temporarily setting the processorSupplier to type Object so that we can transition from the
+    // old ProcessorSupplier to the new api.ProcessorSupplier. This works because all accesses to
+    // this field are guarded by typechecks anyway.
+    private final Object processorSupplier;
 
     private final String queryableStoreName;
 
@@ -141,7 +143,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
         this.processorSupplier = processorSupplier;
-        this.newProcessorSupplier = null;
         this.queryableStoreName = queryableStoreName;
     }
 
@@ -154,8 +155,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                       final GraphNode graphNode,
                       final InternalStreamsBuilder builder) {
         super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
-        this.processorSupplier = null;
-        this.newProcessorSupplier = newProcessorSupplier;
+        this.processorSupplier = newProcessorSupplier;
         this.queryableStoreName = queryableStoreName;
     }