You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:59 UTC

[kafka] 04/05: fixing tests

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 82642732d400465084f42029e54ec703e2242318
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 15:32:31 2021 -0500

    fixing tests
---
 .../streams/kstream/internals/KTableFilter.java    |  4 +-
 .../streams/kstream/internals/KTableImpl.java      |  8 ++++
 .../NewTimestampedCacheFlushListener.java          | 51 ----------------------
 .../internals/SessionCacheFlushListener.java       |  2 +-
 .../internals/TimestampedCacheFlushListener.java   | 27 +++++++++---
 .../internals/TimestampedTupleForwarder.java       | 22 +++++++---
 .../org/apache/kafka/streams/processor/To.java     |  7 +++
 .../streams/processor/internals/ProcessorNode.java |  1 -
 .../state/internals/MeteredKeyValueStore.java      |  4 +-
 .../state/internals/MeteredSessionStore.java       |  4 +-
 .../state/internals/MeteredWindowStore.java        |  4 +-
 .../internals/SessionCacheFlushListenerTest.java   |  2 +-
 .../TimestampedCacheFlushListenerTest.java         |  8 ++--
 .../internals/TimestampedTupleForwarderTest.java   | 23 +++++++---
 .../state/internals/CacheFlushListenerStub.java    |  2 +-
 .../internals/CachingInMemorySessionStoreTest.java |  2 +-
 .../CachingPersistentSessionStoreTest.java         |  2 +-
 17 files changed, 87 insertions(+), 86 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index a23dce4..ecbd4703 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -105,7 +105,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
         }
 
         @Override
-        public void process(Record<KIn, Change<VIn>> record) {
+        public void process(final Record<KIn, Change<VIn>> record) {
             final KIn key = record.key();
             final Change<VIn> change = record.value();
 
@@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
         }
 
         @Override
-        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9733511..01cd194 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -832,6 +832,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             return new KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
+        } else if (processorSupplier instanceof KTableNewProcessorSupplier){
+            return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
         } else {
             return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
         }
@@ -848,6 +850,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
+            } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
+                final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier =
+                    (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier;
+                if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
+                    return false;
+                }
             } else {
                 final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
                 if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
deleted file mode 100644
index d325459..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-
-class NewTimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
-    private final InternalProcessorContext<KOut, Change<VOut>> context;
-
-    @SuppressWarnings("rawtypes")
-    private final ProcessorNode myNode;
-
-    NewTimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) {
-        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
-        myNode = this.context.currentNode();
-    }
-
-    @Override
-    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
-        throw new RuntimeException("ASDFASDF");
-    }
-
-    @Override
-    public void apply(Record<KOut, Change<VOut>> record) {
-        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
-        context.setCurrentNode(myNode);
-        try {
-            context.forward(record);
-        } finally {
-            context.setCurrentNode(prev);
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index ceff4b7..2792dd9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -51,7 +51,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window
     }
 
     @Override
-    public void apply(Record<Windowed<KOut>, Change<VOut>> record) {
+    public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 97ef6cb..4034414 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -24,7 +24,9 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, ValueAndTimestamp<VOut>> {
     private final InternalProcessorContext<KOut, Change<VOut>> context;
 
     @SuppressWarnings("rawtypes")
@@ -42,22 +44,35 @@ class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KO
     }
 
     @Override
-    public void apply(Record<KOut, Change<VOut>> record) {
-        @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
+    public void apply(final KOut key,
+                      final ValueAndTimestamp<VOut> newValue,
+                      final ValueAndTimestamp<VOut> oldValue,
+                      final long timestamp) {
+        final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(record);
+            context.forward(
+                key,
+                new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
+                To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
         } finally {
             context.setCurrentNode(prev);
         }
     }
 
     @Override
-    public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+    public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> record) {
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
+            context.forward(
+                record.withValue(
+                    new Change<>(
+                        getValueOrNull(record.value().newValue),
+                        getValueOrNull(record.value().oldValue)
+                    )
+                )
+            );
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 729e9fd..e5733eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  */
 class TimestampedTupleForwarder<K, V> {
     private final InternalProcessorContext<K, Change<V>> context;
+    private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -41,6 +42,7 @@ class TimestampedTupleForwarder<K, V> {
                               final TimestampedCacheFlushListener<K, V> flushListener,
                               final boolean sendOldValues) {
         this.context = (InternalProcessorContext<K, Change<V>>) context;
+        this.sendOldValues = sendOldValues;
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
@@ -50,24 +52,34 @@ class TimestampedTupleForwarder<K, V> {
                               final TimestampedCacheFlushListener<K, V> flushListener,
                               final boolean sendOldValues) {
         this.context = (InternalProcessorContext) context;
+        this.sendOldValues = sendOldValues;
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
     public void maybeForward(final Record<K, Change<V>> record) {
         if (!cachingEnabled) {
-            context.forward(record);
+            if(sendOldValues) {
+                context.forward(record);
+            } else {
+                context.forward(record.withValue(new Change<>(record.value().newValue, null)));
+            }
         }
     }
 
-    public void maybeForward(K key, V value, V oldValue) {
+    public void maybeForward(final K key,
+                             final V newValue,
+                             final V oldValue) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(value, oldValue));
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
         }
     }
 
-    public void maybeForward(K key, V value, V oldValue, long newTimestamp) {
+    public void maybeForward(final K key,
+                             final V newValue,
+                             final V oldValue,
+                             final long timestamp) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(value, oldValue), To.all().withTimestamp(newTimestamp));
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp));
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
index fe19dbf..69c0c5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -89,4 +89,11 @@ public class To {
         throw new UnsupportedOperationException("To is unsafe for use in Hash collections");
     }
 
+    @Override
+    public String toString() {
+        return "To{" +
+               "childName='" + childName + '\'' +
+               ", timestamp=" + timestamp +
+               '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 80fdc4f..f221c57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 62542e6..94624a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -177,7 +177,7 @@ public class MeteredKeyValueStore<K, V>
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
                 new CacheFlushListener<byte[], byte[]>() {
                     @Override
-                    public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) {
+                    public void apply(final byte[] rawKey, final byte[] rawNewValue, final byte[] rawOldValue, final long timestamp) {
                         listener.apply(
                             serdes.keyFrom(rawKey),
                             rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
@@ -187,7 +187,7 @@ public class MeteredKeyValueStore<K, V>
                     }
 
                     @Override
-                    public void apply(Record<byte[], Change<byte[]>> record) {
+                    public void apply(final Record<byte[], Change<byte[]>> record) {
                         listener.apply(
                             record.withKey(serdes.keyFrom(record.key()))
                             .withValue(new Change<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d305951..3b858e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -148,7 +148,7 @@ public class MeteredSessionStore<K, V>
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
                 new CacheFlushListener<byte[], byte[]>() {
                     @Override
-                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                    public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) {
                         listener.apply(
                             SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
                             newValue != null ? serdes.valueFrom(newValue) : null,
@@ -158,7 +158,7 @@ public class MeteredSessionStore<K, V>
                     }
 
                     @Override
-                    public void apply(Record<byte[], Change<byte[]>> record) {
+                    public void apply(final Record<byte[], Change<byte[]>> record) {
                         listener.apply(
                             record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
                                   .withValue(new Change<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 82f65a6..f77bf4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -152,7 +152,7 @@ public class MeteredWindowStore<K, V>
             return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
                 new CacheFlushListener<byte[], byte[]>() {
                     @Override
-                    public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+                    public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) {
                         listener.apply(
                             WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
                             newValue != null ? serdes.valueFrom(newValue) : null,
@@ -162,7 +162,7 @@ public class MeteredWindowStore<K, V>
                     }
 
                     @Override
-                    public void apply(Record<byte[], Change<byte[]>> record) {
+                    public void apply(final Record<byte[], Change<byte[]>> record) {
                         listener.apply(
                             record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
                                   .withValue(new Change<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index a826d50..c60bcf4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
 public class SessionCacheFlushListenerTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
-        final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<Windowed<String>, Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 7c1b0e7..7c25b2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -32,7 +32,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardValueTimestampIfNewValueExists() {
-        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -43,7 +43,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply(
             "key",
             ValueAndTimestamp.make("newValue", 42L),
             ValueAndTimestamp.make("oldValue", 21L),
@@ -54,7 +54,7 @@ public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardParameterTimestampIfNewValueIsNull() {
-        final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
         expect(context.currentNode()).andReturn(null).anyTimes();
         context.setCurrentNode(null);
         context.setCurrentNode(null);
@@ -65,7 +65,7 @@ public class TimestampedCacheFlushListenerTest {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply(
             "key",
             null,
             ValueAndTimestamp.make("oldValue", 21L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index dc2767c..8b9dccb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.junit.Test;
@@ -39,14 +40,14 @@ public class TimestampedTupleForwarderTest {
 
     private void setFlushListener(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
-        final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class);
+        final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
 
         expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
         replay(store);
 
         new TimestampedTupleForwarder<>(
             store,
-            (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null,
+            (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<Object>>) null,
             flushListener,
             sendOldValues
         );
@@ -62,7 +63,7 @@ public class TimestampedTupleForwarderTest {
 
     private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext context = mock(ProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
 
         expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
         if (sendOldValues) {
@@ -76,7 +77,12 @@ public class TimestampedTupleForwarderTest {
         replay(store, context);
 
         final TimestampedTupleForwarder<String, String> forwarder =
-            new TimestampedTupleForwarder<>(store, context, null, sendOldValues);
+            new TimestampedTupleForwarder<>(
+                store,
+                (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
+                null,
+                sendOldValues
+            );
         forwarder.maybeForward("key1", "newValue1", "oldValue1");
         forwarder.maybeForward("key2", "newValue2", "oldValue2", 42L);
 
@@ -86,13 +92,18 @@ public class TimestampedTupleForwarderTest {
     @Test
     public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
-        final ProcessorContext context = mock(ProcessorContext.class);
+        final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
 
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);
 
         final TimestampedTupleForwarder<String, String> forwarder =
-            new TimestampedTupleForwarder<>(store, context, null, false);
+            new TimestampedTupleForwarder<>(
+                store,
+                (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
+                null,
+                false
+            );
         forwarder.maybeForward("key", "newValue", "oldValue");
         forwarder.maybeForward("key", "newValue", "oldValue", 42L);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
index fba59cf..b214739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
@@ -49,7 +49,7 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[],
     }
 
     @Override
-    public void apply(Record<byte[], Change<byte[]>> record) {
+    public void apply(final Record<byte[], Change<byte[]>> record) {
         forwarded.put(
             keyDeserializer.deserialize(null, record.key()),
             new Change<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index 5885a59..417b35f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -755,7 +755,7 @@ public class CachingInMemorySessionStoreTest {
         }
 
         @Override
-        public void apply(Record<byte[], Change<byte[]>> record) {
+        public void apply(final Record<byte[], Change<byte[]>> record) {
             forwarded.add(
                 new KeyValueTimestamp<>(
                     keyDeserializer.deserialize(null, record.key()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 224c8bd..55018bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -766,7 +766,7 @@ public class CachingPersistentSessionStoreTest {
         }
 
         @Override
-        public void apply(Record<byte[], Change<byte[]>> record) {
+        public void apply(final Record<byte[], Change<byte[]>> record) {
             forwarded.add(
                 new KeyValueTimestamp<>(
                     keyDeserializer.deserialize(null, record.key()),