You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:02:57 UTC
[kafka] 02/05: builds
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ddd0f5d0dd801bbcf9b0c7206e67a8dcfd4db7a4
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 14:25:24 2021 -0500
builds
---
.../streams/kstream/internals/KTableFilter.java | 2 +-
.../kstream/internals/KTableValueGetter.java | 4 +--
.../internals/TimestampedCacheFlushListener.java | 1 +
.../ForeignJoinSubscriptionProcessorSupplier.java | 2 +-
.../SubscriptionStoreReceiveProcessorSupplier.java | 2 +-
.../internals/AbstractProcessorContext.java | 2 +-
.../internals/GlobalProcessorContextImpl.java | 2 +-
.../processor/internals/GlobalStateUpdateTask.java | 5 ++--
.../processor/internals/ProcessorContextImpl.java | 2 +-
.../state/internals/MeteredKeyValueStore.java | 30 +++++++++++++++++-----
.../state/internals/MeteredSessionStore.java | 30 +++++++++++++++++-----
.../state/internals/MeteredWindowStore.java | 30 +++++++++++++++++-----
.../streams/kstream/internals/KStreamImplTest.java | 8 +++---
...KStreamSessionWindowAggregateProcessorTest.java | 2 +-
.../kstream/internals/KTableReduceTest.java | 2 +-
.../internals/SessionCacheFlushListenerTest.java | 2 +-
.../TimestampedCacheFlushListenerTest.java | 9 ++++---
.../internals/TimestampedTupleForwarderTest.java | 9 +++++--
.../internals/AbstractProcessorContextTest.java | 2 +-
.../processor/internals/GlobalStateTaskTest.java | 6 ++---
.../processor/internals/ProcessorNodeTest.java | 8 +++---
.../internals/ProcessorTopologyFactories.java | 2 +-
.../internals/RecordDeserializerTest.java | 2 +-
.../processor/internals/RecordQueueTest.java | 6 +++--
.../streams/processor/internals/SinkNodeTest.java | 8 +++---
.../processor/internals/SourceNodeTest.java | 8 +++---
.../processor/internals/StreamTaskTest.java | 22 ++++++++--------
.../streams/state/KeyValueStoreTestDriver.java | 1 +
.../state/internals/AbstractKeyValueStoreTest.java | 2 ++
.../AbstractRocksDBSegmentedBytesStoreTest.java | 2 +-
.../internals/AbstractSessionBytesStoreTest.java | 3 ++-
.../internals/AbstractWindowBytesStoreTest.java | 2 +-
.../state/internals/CacheFlushListenerStub.java | 12 +++++++++
.../CachingInMemoryKeyValueStoreTest.java | 4 +--
.../internals/CachingInMemorySessionStoreTest.java | 18 +++++++++++--
.../CachingPersistentSessionStoreTest.java | 18 +++++++++++--
.../CachingPersistentWindowStoreTest.java | 4 +--
.../ChangeLoggingKeyValueBytesStoreTest.java | 3 ++-
...geLoggingTimestampedKeyValueBytesStoreTest.java | 3 ++-
.../CompositeReadOnlyKeyValueStoreTest.java | 11 ++++++--
.../state/internals/InMemoryKeyValueStoreTest.java | 1 +
.../state/internals/InMemoryLRUCacheStoreTest.java | 1 +
.../state/internals/InMemoryWindowStoreTest.java | 1 +
.../state/internals/KeyValueSegmentsTest.java | 2 +-
.../MeteredTimestampedWindowStoreTest.java | 2 +-
.../state/internals/MeteredWindowStoreTest.java | 2 +-
.../streams/state/internals/RocksDBStoreTest.java | 3 ++-
.../RocksDBTimeOrderedWindowStoreTest.java | 2 +-
.../state/internals/RocksDBWindowStoreTest.java | 1 +
.../state/internals/SegmentIteratorTest.java | 3 ++-
.../state/internals/TimestampedSegmentsTest.java | 2 +-
.../kafka/test/InternalMockProcessorContext.java | 8 +++---
.../kafka/test/MockInternalProcessorContext.java | 2 +-
.../org/apache/kafka/test/MockProcessorNode.java | 2 +-
.../java/org/apache/kafka/test/MockSourceNode.java | 4 +--
.../apache/kafka/test/NoOpProcessorContext.java | 2 +-
56 files changed, 227 insertions(+), 102 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index bbffea6..a23dce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -166,7 +166,7 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
parentGetter.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index c939234..12145fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
public interface KTableValueGetter<K, V> {
- void init(ProcessorContext<Void, Void> context);
+ void init(ProcessorContext context);
ValueAndTimestamp<V> get(K key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index 6dbf435..97ef6cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
class TimestampedCacheFlushListener<KOut, VOut> implements CacheFlushListener<KOut, VOut> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index fd95105..3c12afa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -63,7 +63,7 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc
@Override
public void init(final ProcessorContext context) {
super.init(context);
- final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+ final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 61fb1c1..98bcd4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -60,7 +60,7 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
@Override
public void init(final ProcessorContext context) {
super.init(context);
- final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+ final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 37ffbdc..79b2d0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -34,7 +34,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-public abstract class AbstractProcessorContext implements InternalProcessorContext<Object, Object> {
+public abstract class AbstractProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, VOut> {
private final TaskId taskId;
private final String applicationId;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index be3cf55..dbdd6a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -34,7 +34,7 @@ import java.time.Duration;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
-public class GlobalProcessorContextImpl extends AbstractProcessorContext {
+public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> {
private final GlobalStateManager stateManager;
private final Time time;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 6b1378b..e5f591c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -69,7 +69,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
- final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic);
+ final SourceNode<?, ?> source = topology.source(sourceTopic);
deserializers.put(
sourceTopic,
new RecordDeserializer(
@@ -111,7 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
processorContext.timestamp(),
processorContext.headers()
);
- ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
+ ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
}
offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
@@ -138,6 +138,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
}
}
+ @SuppressWarnings("unchecked")
private void initTopology() {
for (final ProcessorNode<?, ?, ?, ?> node : this.topology.processors()) {
processorContext.setCurrentNode(node);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dcae2ab..bd7ece4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -42,7 +42,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDur
import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
-public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
+public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
// the below are null for standby tasks
private StreamTask streamTask;
private RecordCollector collector;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 18c44e8..62542e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -173,12 +175,28 @@ public class MeteredKeyValueStore<K, V>
final KeyValueStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
- (rawKey, rawNewValue, rawOldValue, timestamp) -> listener.apply(
- serdes.keyFrom(rawKey),
- rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
- rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
- timestamp
- ),
+ new CacheFlushListener<byte[], byte[]>() {
+ @Override
+ public void apply(byte[] rawKey, byte[] rawNewValue, byte[] rawOldValue, long timestamp) {
+ listener.apply(
+ serdes.keyFrom(rawKey),
+ rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
+ rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
+ timestamp
+ );
+ }
+
+ @Override
+ public void apply(Record<byte[], Change<byte[]>> record) {
+ listener.apply(
+ record.withKey(serdes.keyFrom(record.key()))
+ .withValue(new Change<>(
+ record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+ record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+ ))
+ );
+ }
+ },
sendOldValues);
}
return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 1fbc8db..d305951 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -144,12 +146,28 @@ public class MeteredSessionStore<K, V>
final SessionStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
- (key, newValue, oldValue, timestamp) -> listener.apply(
- SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
- newValue != null ? serdes.valueFrom(newValue) : null,
- oldValue != null ? serdes.valueFrom(oldValue) : null,
- timestamp
- ),
+ new CacheFlushListener<byte[], byte[]>() {
+ @Override
+ public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+ listener.apply(
+ SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()),
+ newValue != null ? serdes.valueFrom(newValue) : null,
+ oldValue != null ? serdes.valueFrom(oldValue) : null,
+ timestamp
+ );
+ }
+
+ @Override
+ public void apply(Record<byte[], Change<byte[]>> record) {
+ listener.apply(
+ record.withKey(SessionKeySchema.from(record.key(), serdes.keyDeserializer(), serdes.topic()))
+ .withValue(new Change<>(
+ record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+ record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+ ))
+ );
+ }
+ },
sendOldValues);
}
return false;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 91b4387..82f65a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -148,12 +150,28 @@ public class MeteredWindowStore<K, V>
final WindowStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener(
- (key, newValue, oldValue, timestamp) -> listener.apply(
- WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
- newValue != null ? serdes.valueFrom(newValue) : null,
- oldValue != null ? serdes.valueFrom(oldValue) : null,
- timestamp
- ),
+ new CacheFlushListener<byte[], byte[]>() {
+ @Override
+ public void apply(byte[] key, byte[] newValue, byte[] oldValue, long timestamp) {
+ listener.apply(
+ WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()),
+ newValue != null ? serdes.valueFrom(newValue) : null,
+ oldValue != null ? serdes.valueFrom(oldValue) : null,
+ timestamp
+ );
+ }
+
+ @Override
+ public void apply(Record<byte[], Change<byte[]>> record) {
+ listener.apply(
+ record.withKey(WindowKeySchema.fromStoreKey(record.key(), windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
+ .withValue(new Change<>(
+ record.value().newValue != null ? serdes.valueFrom(record.value().newValue) : null,
+ record.value().oldValue != null ? serdes.valueFrom(record.value().oldValue) : null
+ ))
+ );
+ }
+ },
sendOldValues);
}
return false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index ac5db68..7bdcea8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -1525,9 +1525,9 @@ public class KStreamImplTest {
final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
- final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
+ final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
- for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
+ for (final SourceNode<?, ?> sourceNode : topology.sources()) {
if (sourceNode.name().equals(originalSourceNode.name())) {
assertNull(sourceNode.getTimestampExtractor());
} else {
@@ -1554,9 +1554,9 @@ public class KStreamImplTest {
final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
- final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
+ final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
- for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
+ for (final SourceNode<?, ?> sourceNode : topology.sources()) {
if (sourceNode.name().equals(originalSourceNode.name())) {
assertNull(sourceNode.getTimestampExtractor());
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0982337..9ee3347 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -98,7 +98,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
private void setup(final String builtInMetricsVersion, final boolean enableCache) {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime());
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<Object, Object>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index b360151..87d6e87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -36,7 +36,7 @@ public class KTableReduceTest {
@Test
public void shouldAddAndSubtract() {
- final InternalMockProcessorContext context = new InternalMockProcessorContext();
+ final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>();
final Processor<String, Change<Set<String>>> reduceProcessor =
new KTableReduce<String, Set<String>>(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index b25febf..a826d50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
public class SessionCacheFlushListenerTest {
@Test
public void shouldForwardKeyNewValueOldValueAndTimestamp() {
- final InternalProcessorContext context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext<Windowed<String>,Change<String>> context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 38ef5c6..7c1b0e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.Test;
@@ -31,7 +32,7 @@ public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardValueTimestampIfNewValueExists() {
- final InternalProcessorContext context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
@@ -42,7 +43,7 @@ public class TimestampedCacheFlushListenerTest {
expectLastCall();
replay(context);
- new TimestampedCacheFlushListener<>(context).apply(
+ new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
"key",
ValueAndTimestamp.make("newValue", 42L),
ValueAndTimestamp.make("oldValue", 21L),
@@ -53,7 +54,7 @@ public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardParameterTimestampIfNewValueIsNull() {
- final InternalProcessorContext context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext<String, Change<ValueAndTimestamp<String>>> context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
@@ -64,7 +65,7 @@ public class TimestampedCacheFlushListenerTest {
expectLastCall();
replay(context);
- new TimestampedCacheFlushListener<>(context).apply(
+ new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<ValueAndTimestamp<String>>>) context).apply(
"key",
null,
ValueAndTimestamp.make("oldValue", 21L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 52a5fcf..dc2767c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -39,12 +39,17 @@ public class TimestampedTupleForwarderTest {
private void setFlushListener(final boolean sendOldValues) {
final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> store = mock(WrappedStateStore.class);
- final TimestampedCacheFlushListener<Object, Object> flushListener = mock(TimestampedCacheFlushListener.class);
+ final TimestampedCacheFlushListener<Object, ValueAndTimestamp<Object>> flushListener = mock(TimestampedCacheFlushListener.class);
expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
replay(store);
- new TimestampedTupleForwarder<>(store, null, flushListener, sendOldValues);
+ new TimestampedTupleForwarder<>(
+ store,
+ (org.apache.kafka.streams.processor.api.ProcessorContext<Object, Change<ValueAndTimestamp<Object>>>) null,
+ flushListener,
+ sendOldValues
+ );
verify(store);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index b813422..e4968e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -171,7 +171,7 @@ public class AbstractProcessorContextTest {
);
}
- private static class TestProcessorContext extends AbstractProcessorContext {
+ private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> {
static Properties config;
static {
config = getStreamsConfig();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index e4bc600..31be9dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -61,10 +61,10 @@ public class GlobalStateTaskTest {
private final String topic2 = "t2";
private final TopicPartition t1 = new TopicPartition(topic1, 1);
private final TopicPartition t2 = new TopicPartition(topic2, 1);
- private final MockSourceNode<String, String, ?, ?> sourceOne = new MockSourceNode<>(
+ private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(
new StringDeserializer(),
new StringDeserializer());
- private final MockSourceNode<Integer, Integer, ?, ?> sourceTwo = new MockSourceNode<>(
+ private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode<>(
new IntegerDeserializer(),
new IntegerDeserializer());
private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>();
@@ -81,7 +81,7 @@ public class GlobalStateTaskTest {
@Before
public void before() {
final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
- final Map<String, SourceNode<?, ?, ?, ?>> sourceByTopics = new HashMap<>();
+ final Map<String, SourceNode<?, ?>> sourceByTopics = new HashMap<>();
sourceByTopics.put(topic1, sourceOne);
sourceByTopics.put(topic2, sourceTwo);
final Map<String, String> storeToTopic = new HashMap<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 84bdf51..ef46ab3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -110,8 +110,8 @@ public class ProcessorNodeTest {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime());
- final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
- final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+ final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
+ final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
node.init(context);
final String threadId = Thread.currentThread().getName();
@@ -196,8 +196,8 @@ public class ProcessorNodeTest {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
- final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
- final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+ final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
+ final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
node.init(context);
final StreamsException se = assertThrows(
StreamsException.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
index b4cef8a..57e4490 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
@@ -27,7 +27,7 @@ public final class ProcessorTopologyFactories {
public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
+ final Map<String, SourceNode<?, ?>> sourcesByTopic,
final List<StateStore> stateStoresByName,
final Map<String, String> storeToChangelogTopic) {
return new ProcessorTopology(processorNodes,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 18d17ae..448ceaf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -68,7 +68,7 @@ public class RecordDeserializerTest {
assertEquals(rawRecord.headers(), record.headers());
}
- static class TheSourceNode extends SourceNode<Object, Object, Object, Object> {
+ static class TheSourceNode extends SourceNode<Object, Object> {
private final boolean keyThrowsException;
private final boolean valueThrowsException;
private final Object key;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 142e85a..d23311b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -62,11 +62,12 @@ public class RecordQueueTest {
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
- final InternalMockProcessorContext context = new InternalMockProcessorContext(
+ @SuppressWarnings("rawtypes")
+ final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new MockRecordCollector()
);
- private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics
+ private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
= new MockSourceNode<>(intDeserializer, intDeserializer);
private final RecordQueue queue = new RecordQueue(
new TopicPartition("topic", 1),
@@ -86,6 +87,7 @@ public class RecordQueueTest {
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
+ @SuppressWarnings("unchecked")
@Before
public void before() {
mockSourceNodeWithMetrics.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 30c7b1b..7e7f7b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -33,13 +33,13 @@ public class SinkNodeTest {
private final StateSerdes<Bytes, Bytes> anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
private final RecordCollector recordCollector = new MockRecordCollector();
- private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector);
- private final SinkNode<byte[], byte[], ?, ?> sink = new SinkNode<>("anyNodeName",
+ private final InternalMockProcessorContext<Void, Void> context = new InternalMockProcessorContext<>(anyStateSerde, recordCollector);
+ private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName",
new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null);
// Used to verify that the correct exceptions are thrown if the compiler checks are bypassed
- @SuppressWarnings("unchecked")
- private final SinkNode<Object, Object, ?, ?> illTypedSink = (SinkNode) sink;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink;
@Before
public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 92e4719..00c5647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -44,7 +44,7 @@ import static org.junit.Assert.assertTrue;
public class SourceNodeTest {
@Test
public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
- final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+ final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
final RecordHeaders headers = new RecordHeaders();
final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -52,7 +52,7 @@ public class SourceNodeTest {
@Test
public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
- final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+ final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
final RecordHeaders headers = new RecordHeaders();
final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -84,8 +84,8 @@ public class SourceNodeTest {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime());
- final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
- final SourceNode<String, String, ?, ?> node =
+ final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
+ final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
node.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9763c4f..0e0f314 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -137,9 +137,9 @@ public class StreamTaskTest {
private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
- private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
+ private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
@Override
public void process(final Record<Integer, Integer> record) {
throw new RuntimeException("KABOOM!");
@@ -150,7 +150,7 @@ public class StreamTaskTest {
throw new RuntimeException("KABOOM!");
}
};
- private final MockSourceNode<Integer, Integer, ?, ?> timeoutSource = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
+ private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
@Override
public void process(final Record<Integer, Integer> record) {
throw new TimeoutException("Kaboom!");
@@ -192,7 +192,7 @@ public class StreamTaskTest {
};
private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
+ final Map<String, SourceNode<?, ?>> sourcesByTopic,
final Set<String> repartitionTopics) {
return new ProcessorTopology(processorNodes,
sourcesByTopic,
@@ -204,7 +204,7 @@ public class StreamTaskTest {
}
private static ProcessorTopology withSources(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) {
+ final Map<String, SourceNode<?, ?>> sourcesByTopic) {
return new ProcessorTopology(processorNodes,
sourcesByTopic,
emptyMap(),
@@ -622,11 +622,11 @@ public class StreamTaskTest {
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
// Create a processor that only forwards even keys to test the metrics at the source and terminal nodes
- final MockSourceNode<Integer, Integer, Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer, Integer, Integer>(intDeserializer, intDeserializer) {
- InternalProcessorContext context;
+ final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
+ InternalProcessorContext<Integer, Integer> context;
@Override
- public void init(final InternalProcessorContext context) {
+ public void init(final InternalProcessorContext<Integer, Integer> context) {
this.context = context;
super.init(context);
}
@@ -1528,6 +1528,7 @@ public class StreamTaskTest {
assertFalse(checkpointFile.exists());
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
@@ -1568,6 +1569,7 @@ public class StreamTaskTest {
assertThrows(IllegalStateException.class, () -> task.schedule(1, PunctuationType.STREAM_TIME, timestamp -> { }));
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
@@ -2480,7 +2482,7 @@ public class StreamTaskTest {
);
}
- private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer, Integer, Integer> sourceNode) {
+ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode<Integer, Integer> sourceNode) {
final ProcessorTopology topology = withSources(
asList(sourceNode, processorStreamTime),
singletonMap(topic1, sourceNode)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 38e9860..3f438f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -187,6 +187,7 @@ public class KeyValueStoreTestDriver<K, V> {
private final InternalMockProcessorContext context;
private final StateSerdes<K, V> stateSerdes;
+ @SuppressWarnings("unchecked")
private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 5959a80..e050a21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -49,6 +49,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("unchecked")
public abstract class AbstractKeyValueStoreTest {
protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
@@ -80,6 +81,7 @@ public abstract class AbstractKeyValueStoreTest {
return result;
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldNotIncludeDeletedFromRangeResult() {
store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index b103e98..da1050a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -124,7 +124,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore = getBytesStore();
stateDir = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
stateDir,
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index ed60837..b005905 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -89,7 +89,7 @@ public abstract class AbstractSessionBytesStoreTest {
public void setUp() {
sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
recordCollector = new MockRecordCollector();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
@@ -536,6 +536,7 @@ public abstract class AbstractSessionBytesStoreTest {
assertFalse(iterator.hasNext());
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldRestore() {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 08fcb6d..2f9432a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -101,7 +101,7 @@ public abstract class AbstractWindowBytesStoreTest {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
recordCollector = new MockRecordCollector();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
baseDir,
Serdes.String(),
Serdes.Integer(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
index ea4b147..fba59cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;
@@ -46,4 +47,15 @@ public class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[],
)
);
}
+
+ @Override
+ public void apply(Record<byte[], Change<byte[]>> record) {
+ forwarded.put(
+ keyDeserializer.deserialize(null, record.key()),
+ new Change<>(
+ valueDeserializer.deserialize(null, record.value().newValue),
+ valueDeserializer.deserialize(null, record.value().oldValue)
+ )
+ );
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 66b13c1..ff78642 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -74,7 +74,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
store = new CachingKeyValueStore(underlyingStore);
store.setFlushListener(cacheFlushListener, false);
cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
- context = new InternalMockProcessorContext(null, null, null, null, cache);
+ context = new InternalMockProcessorContext<>(null, null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
store.init((StateStoreContext) context, null);
}
@@ -200,7 +200,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
EasyMock.replay(underlyingStore);
store = new CachingKeyValueStore(underlyingStore);
cache = EasyMock.niceMock(ThreadCache.class);
- context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
store.init((StateStoreContext) context, store);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index e584e2c..5885a59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -87,7 +88,7 @@ public class CachingInMemorySessionStoreTest {
underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope");
cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
- context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -223,7 +224,7 @@ public class CachingInMemorySessionStoreTest {
EasyMock.replay(underlyingStore);
cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
cache = EasyMock.niceMock(ThreadCache.class);
- final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ final InternalMockProcessorContext context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -752,5 +753,18 @@ public class CachingInMemorySessionStoreTest {
valueDesializer.deserialize(null, oldValue)),
timestamp));
}
+
+ @Override
+ public void apply(Record<byte[], Change<byte[]>> record) {
+ forwarded.add(
+ new KeyValueTimestamp<>(
+ keyDeserializer.deserialize(null, record.key()),
+ new Change<>(
+ valueDesializer.deserialize(null, record.value().newValue),
+ valueDesializer.deserialize(null, record.value().oldValue)),
+ record.timestamp()
+ )
+ );
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 7f8a394..224c8bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -92,7 +93,7 @@ public class CachingPersistentSessionStoreTest {
cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
final InternalMockProcessorContext context =
- new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -209,7 +210,7 @@ public class CachingPersistentSessionStoreTest {
cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
cache = EasyMock.niceMock(ThreadCache.class);
final InternalMockProcessorContext context =
- new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -763,5 +764,18 @@ public class CachingPersistentSessionStoreTest {
)
);
}
+
+ @Override
+ public void apply(Record<byte[], Change<byte[]>> record) {
+ forwarded.add(
+ new KeyValueTimestamp<>(
+ keyDeserializer.deserialize(null, record.key()),
+ new Change<>(
+ valueDesializer.deserialize(null, record.value().newValue),
+ valueDesializer.deserialize(null, record.value().oldValue)),
+ record.timestamp()
+ )
+ );
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 13f1f2b..e434c21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -103,7 +103,7 @@ public class CachingPersistentWindowStoreTest {
cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
cachingStore.setFlushListener(cacheListener, false);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
- context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
cachingStore.init((StateStoreContext) context, cachingStore);
}
@@ -906,7 +906,7 @@ public class CachingPersistentWindowStoreTest {
EasyMock.replay(underlyingStore);
cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
cache = EasyMock.createNiceMock(ThreadCache.class);
- context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+ context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
cachingStore.init((StateStoreContext) context, cachingStore);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 255994c..9e2532d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+@SuppressWarnings("ALL")
public class ChangeLoggingKeyValueBytesStoreTest {
private final MockRecordCollector collector = new MockRecordCollector();
@@ -64,7 +65,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
}
private InternalMockProcessorContext mockContext() {
- return new InternalMockProcessorContext(
+ return new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index 8295f7d..d65d948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -42,6 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+@SuppressWarnings("rawtypes")
public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
private final MockRecordCollector collector = new MockRecordCollector();
@@ -64,7 +65,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
}
private InternalMockProcessorContext mockContext() {
- return new InternalMockProcessorContext(
+ return new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 736721a..ca8468b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -76,8 +76,15 @@ public class CompositeReadOnlyKeyValueStoreTest {
Serdes.String())
.build();
- final InternalMockProcessorContext context = new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName),
- Serdes.String(), Serdes.String()), new MockRecordCollector());
+ @SuppressWarnings("rawtypes") final InternalMockProcessorContext context =
+ new InternalMockProcessorContext<>(
+ new StateSerdes<>(
+ ProcessorStateManager.storeChangelogTopic("appId", storeName),
+ Serdes.String(),
+ Serdes.String()
+ ),
+ new MockRecordCollector()
+ );
context.setTime(1L);
store.init((StateStoreContext) context, store);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 831e684..87a2063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -80,6 +80,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
return store;
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldRemoveKeysWithNullValues() {
store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index a044eda..53057b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -132,6 +132,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
assertEquals(3, driver.numFlushedEntryRemoved());
}
+ @SuppressWarnings("unchecked")
@Test
public void testRestoreEvict() {
store.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 2ef9bad..2a9b5bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -67,6 +67,7 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldRestore() {
// should be empty initially
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index aeef8ce..c8f1a0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -55,7 +55,7 @@ public class KeyValueSegmentsTest {
@Before
public void createContext() {
stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
stateDirectory,
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index c05c1ba..4266751 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -87,7 +87,7 @@ public class MeteredTimestampedWindowStoreTest {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, new MockTime());
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 538c8d3..b18919a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -132,7 +132,7 @@ public class MeteredWindowStoreTest {
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, new MockTime());
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index c9e9a8d..14166be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -87,6 +87,7 @@ import static org.junit.Assert.assertTrue;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;
+@SuppressWarnings("unchecked")
public class RocksDBStoreTest {
private static boolean enableBloomFilters = false;
final static String DB_NAME = "db-name";
@@ -107,7 +108,7 @@ public class RocksDBStoreTest {
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
dir = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
dir,
Serdes.String(),
Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
index 2646c4c..95c88ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
@@ -59,7 +59,7 @@ public class RocksDBTimeOrderedWindowStoreTest {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
recordCollector = new MockRecordCollector();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
baseDir,
Serdes.String(),
Serdes.Integer(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 5643cde..fff4ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -491,6 +491,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
);
}
+ @SuppressWarnings("unchecked")
@Test
public void testRestore() throws Exception {
final long startTime = SEGMENT_INTERVAL * 2;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 97593e7..c7e5924 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -53,9 +53,10 @@ public class SegmentIteratorTest {
private SegmentIterator<KeyValueSegment> iterator = null;
+ @SuppressWarnings("rawtypes")
@Before
public void before() {
- final InternalMockProcessorContext context = new InternalMockProcessorContext(
+ final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index 558f1c9..722cb69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -55,7 +55,7 @@ public class TimestampedSegmentsTest {
@Before
public void createContext() {
stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext(
+ context = new InternalMockProcessorContext<>(
stateDirectory,
Serdes.String(),
Serdes.Long(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 8d39bf3..b19fdf5 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -59,8 +59,8 @@ import java.util.Map;
import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
-public class InternalMockProcessorContext
- extends AbstractProcessorContext
+public class InternalMockProcessorContext<KOut, VOut>
+ extends AbstractProcessorContext<KOut, VOut>
implements RecordCollector.Supplier {
private StateManager stateManager = new StateManagerStub();
@@ -290,13 +290,13 @@ public class InternalMockProcessorContext
public void commit() {}
@Override
- public <K, V> void forward(final Record<K, V> record) {
+ public <K extends KOut, V extends VOut> void forward(final Record<K, V> record) {
forward(record, null);
}
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final Record<K, V> record, final String childName) {
+ public <K extends KOut, V extends VOut> void forward(final Record<K, V> record, final String childName) {
if (recordContext != null && record.timestamp() != recordContext.timestamp()) {
setTime(record.timestamp());
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 370dca7..c32c136 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -40,7 +40,7 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
private ProcessorNode currentNode;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index a75c250..4ab4cb8b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -53,7 +53,7 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn,
}
@Override
- public void init(final InternalProcessorContext context) {
+ public void init(final InternalProcessorContext<KOut, VOut> context) {
super.init(context);
initialized = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 9d22e3b..f52134e 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
+public class MockSourceNode<KIn, VIn> extends SourceNode<KIn, VIn> {
private static final String NAME = "MOCK-SOURCE-";
private static final AtomicInteger INDEX = new AtomicInteger(1);
@@ -47,7 +47,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K
}
@Override
- public void init(final InternalProcessorContext context) {
+ public void init(final InternalProcessorContext<KIn, VIn> context) {
super.init(context);
initialized = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index a3ec02b..b3243cd 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -44,7 +44,7 @@ import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-public class NoOpProcessorContext extends AbstractProcessorContext {
+public class NoOpProcessorContext extends AbstractProcessorContext<Object, Object> {
public boolean initialized;
@SuppressWarnings("WeakerAccess")
public Map<Object, Object> forwardedValues = new HashMap<>();