You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:59 UTC
[kafka] 04/05: fixing tests
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 82642732d400465084f42029e54ec703e2242318
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 15:32:31 2021 -0500
fixing tests
---
.../streams/kstream/internals/KTableFilter.java | 4 +-
.../streams/kstream/internals/KTableImpl.java | 8 ++++
.../NewTimestampedCacheFlushListener.java | 51 ----------------------
.../internals/SessionCacheFlushListener.java | 2 +-
.../internals/TimestampedCacheFlushListener.java | 27 +++++++++---
.../internals/TimestampedTupleForwarder.java | 22 +++++++---
.../org/apache/kafka/streams/processor/To.java | 7 +++
.../streams/processor/internals/ProcessorNode.java | 1 -
.../state/internals/MeteredKeyValueStore.java | 4 +-
.../state/internals/MeteredSessionStore.java | 4 +-
.../state/internals/MeteredWindowStore.java | 4 +-
.../internals/SessionCacheFlushListenerTest.java | 2 +-
.../TimestampedCacheFlushListenerTest.java | 8 ++--
.../internals/TimestampedTupleForwarderTest.java | 23 +++++++---
.../state/internals/CacheFlushListenerStub.java | 2 +-
.../internals/CachingInMemorySessionStoreTest.java | 2 +-
.../CachingPersistentSessionStoreTest.java | 2 +-
17 files changed, 87 insertions(+), 86 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index a23dce4..ecbd4703 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -105,7 +105,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
}
@Override
- public void process(Record<KIn, Change<VIn>> record) {
+ public void process(final Record<KIn, Change<VIn>> record) {
final KIn key = record.key();
final Change<VIn> change = record.value();
@@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
}
@Override
- public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
+ public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
parentGetter.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9733511..01cd194 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -832,6 +832,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
return new KTableSourceValueGetterSupplier<>(source.queryableName());
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
+ } else if (processorSupplier instanceof KTableNewProcessorSupplier){
+ return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
}
@@ -848,6 +850,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
+ } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
+ final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier =
+ (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier;
+ if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
+ return false;
+ }
} else {
final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
deleted file mode 100644
index d325459..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NewTimestampedCacheFlushListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-
-class NewTimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
- private final InternalProcessorContext<KOut, Change<VOut>> context;
-
- @SuppressWarnings("rawtypes")
- private final ProcessorNode myNode;
-
- NewTimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>> context) {
- this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
- myNode = this.context.currentNode();
- }
-
- @Override
- public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
- throw new RuntimeException("ASDFASDF");
- }
-
- @Override
- public void apply(Record<KOut, Change<VOut>> record) {
- @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
- context.setCurrentNode(myNode);
- try {
- context.forward(record);
- } finally {
- context.setCurrentNode(prev);
- }
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index ceff4b7..2792dd9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -51,7 +51,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window
}
@Override
- public void apply(Record<Windowed<KOut>, Change<VOut>> record) {
+ public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
@SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 97ef6cb..4034414 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -24,7 +24,9 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
-class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, ValueAndTimestamp<VOut>> {
private final InternalProcessorContext<KOut, Change<VOut>> context;
@SuppressWarnings("rawtypes")
@@ -42,22 +44,35 @@ class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KO
}
@Override
- public void apply(Record<KOut, Change<VOut>> record) {
- @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
+ public void apply(final KOut key,
+ final ValueAndTimestamp<VOut> newValue,
+ final ValueAndTimestamp<VOut> oldValue,
+ final long timestamp) {
+ final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
- context.forward(record);
+ context.forward(
+ key,
+ new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
+ To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
} finally {
context.setCurrentNode(prev);
}
}
@Override
- public void apply(KOut key, VOut newValue, VOut oldValue, long timestamp) {
+ public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> record) {
@SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
- context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
+ context.forward(
+ record.withValue(
+ new Change<>(
+ getValueOrNull(record.value().newValue),
+ getValueOrNull(record.value().oldValue)
+ )
+ )
+ );
} finally {
context.setCurrentNode(prev);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 729e9fd..e5733eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
*/
class TimestampedTupleForwarder<K, V> {
private final InternalProcessorContext<K, Change<V>> context;
+ private final boolean sendOldValues;
private final boolean cachingEnabled;
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -41,6 +42,7 @@ class TimestampedTupleForwarder<K, V> {
final TimestampedCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.context = (InternalProcessorContext<K, Change<V>>) context;
+ this.sendOldValues = sendOldValues;
cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
}
@@ -50,24 +52,34 @@ class TimestampedTupleForwarder<K, V> {
final TimestampedCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.context = (InternalProcessorContext) context;
+ this.sendOldValues = sendOldValues;
cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
}
public void maybeForward(final Record<K, Change<V>> record) {
if (!cachingEnabled) {
- context.forward(record);
+ if(sendOldValues) {
+ context.forward(record);
+ } else {
+ context.forward(record.withValue(new Change<>(record.value().newValue, null)));
+ }
}
}
- public void maybeForward(K key, V value, V oldValue) {
+ public void maybeForward(final K key,
+ final V newValue,
+ final V oldValue) {
if (!cachingEnabled) {
- context.forward(key, new Change<>(value, oldValue));
+ context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
}
}
- public void maybeForward(K key, V value, V oldValue, long newTimestamp) {
+ public void maybeForward(final K key,
+ final V newValue,
+ final V oldValue,
+ final long timestamp) {
if (!cachingEnabled) {
- context.forward(key, new Change<>(value, oldValue), To.all().withTimestamp(newTimestamp));
+ context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
index fe19dbf..69c0c5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -89,4 +89,11 @@ public class To {
throw new UnsupportedOperationException("To is unsafe for use in Hash collections");
}
+ @Override
+ public String toString() {
+ return "To{" +
+ "childName='" + childName + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 80fdc4f..f221c57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 62542e6..94624a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -177,7 +177,7 @@ public class MeteredKeyValueStore<K, V>
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
new CacheFlushListener<byte[], byte[]>() {
@Override
- public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) {
+ public void apply(final byte[] rawKey, final byte[] rawNewValue, final byte[] rawOldValue, final long timestamp) {
listener.apply(
serdes.keyFrom(rawKey),
rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
@@ -187,7 +187,7 @@ public class MeteredKeyValueStore<K, V>
}
@Override
- public void apply(Record<byte[], Change<byte[]>> record) {
+ public void apply(final Record<byte[], Change<byte[]>> record) {
listener.apply(
record.withKey(serdes.keyFrom(record.key()))
.withValue(new Change<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d305951..3b858e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -148,7 +148,7 @@ public class MeteredSessionStore<K, V>
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
new CacheFlushListener<byte[], byte[]>() {
@Override
- public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+ public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) {
listener.apply(
SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
newValue != null ? serdes.valueFrom(newValue) : null,
@@ -158,7 +158,7 @@ public class MeteredSessionStore<K, V>
}
@Override
- public void apply(Record<byte[], Change<byte[]>> record) {
+ public void apply(final Record<byte[], Change<byte[]>> record) {
listener.apply(
record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
.withValue(new Change<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 82f65a6..f77bf4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -152,7 +152,7 @@ public class MeteredWindowStore<K, V>
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
new CacheFlushListener<byte[], byte[]>() {
@Override
- public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+ public void apply(final byte[] key, final byte[] newValue, final byte[] oldValue, final long timestamp) {
listener.apply(
WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
newValue != null ? serdes.valueFrom(newValue) : null,
@@ -162,7 +162,7 @@ public class MeteredWindowStore<K, V>
}
@Override
- public void apply(Record<byte[], Change<byte[]>> record) {
+ public void apply(final Record<byte[], Change<byte[]>> record) {
listener.apply(
record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
.withValue(new Change<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index a826d50..c60bcf4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
public class SessionCacheFlushListenerTest {
@Test
public void shouldForwardKeyNewValueOldValueAndTimestamp() {
- final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext<Windowed<String>, Change<String>> context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 7c1b0e7..7c25b2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -32,7 +32,7 @@ public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardValueTimestampIfNewValueExists() {
- final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
@@ -43,7 +43,7 @@ public class TimestampedCacheFlushListenerTest {
expectLastCall();
replay(context);
- new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
+ new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply(
"key",
ValueAndTimestamp.make("newValue", 42L),
ValueAndTimestamp.make("oldValue", 21L),
@@ -54,7 +54,7 @@ public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardParameterTimestampIfNewValueIsNull() {
- final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
@@ -65,7 +65,7 @@ public class TimestampedCacheFlushListenerTest {
expectLastCall();
replay(context);
- new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
+ new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply(
"key",
null,
ValueAndTimestamp.make("oldValue", 21L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index dc2767c..8b9dccb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.junit.Test;
@@ -39,14 +40,14 @@ public class TimestampedTupleForwarderTest {
private void setFlushListener(final boolean sendOldValues) {
final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
- final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class);
+ final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
replay(store);
new TimestampedTupleForwarder<>(
store,
- (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null,
+ (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<Object>>) null,
flushListener,
sendOldValues
);
@@ -62,7 +63,7 @@ public class TimestampedTupleForwarderTest {
private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext context = mock(ProcessorContext.class);
+ final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
if (sendOldValues) {
@@ -76,7 +77,12 @@ public class TimestampedTupleForwarderTest {
replay(store, context);
final TimestampedTupleForwarder<String, String> forwarder =
- new TimestampedTupleForwarder<>(store, context, null, sendOldValues);
+ new TimestampedTupleForwarder<>(
+ store,
+ (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
+ null,
+ sendOldValues
+ );
forwarder.maybeForward("key1", "newValue1", "oldValue1");
forwarder.maybeForward("key2", "newValue2", "oldValue2", 42L);
@@ -86,13 +92,18 @@ public class TimestampedTupleForwarderTest {
@Test
public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext context = mock(ProcessorContext.class);
+ final InternalProcessorContext<String, Change<String>> context = mock(InternalProcessorContext.class);
expect(store.setFlushListener(null, false)).andReturn(true);
replay(store, context);
final TimestampedTupleForwarder<String, String> forwarder =
- new TimestampedTupleForwarder<>(store, context, null, false);
+ new TimestampedTupleForwarder<>(
+ store,
+ (org.apache.kafka.streams.processor.api.ProcessorContext<String, Change<String>>) context,
+ null,
+ false
+ );
forwarder.maybeForward("key", "newValue", "oldValue");
forwarder.maybeForward("key", "newValue", "oldValue", 42L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
index fba59cf..b214739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
@@ -49,7 +49,7 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[],
}
@Override
- public void apply(Record<byte[], Change<byte[]>> record) {
+ public void apply(final Record<byte[], Change<byte[]>> record) {
forwarded.put(
keyDeserializer.deserialize(null, record.key()),
new Change<>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index 5885a59..417b35f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -755,7 +755,7 @@ public class CachingInMemorySessionStoreTest {
}
@Override
- public void apply(Record<byte[], Change<byte[]>> record) {
+ public void apply(final Record<byte[], Change<byte[]>> record) {
forwarded.add(
new KeyValueTimestamp<>(
keyDeserializer.deserialize(null, record.key()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 224c8bd..55018bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -766,7 +766,7 @@ public class CachingPersistentSessionStoreTest {
}
@Override
- public void apply(Record<byte[], Change<byte[]>> record) {
+ public void apply(final Record<byte[], Change<byte[]>> record) {
forwarded.add(
new KeyValueTimestamp<>(
keyDeserializer.deserialize(null, record.key()),