You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/01 19:27:19 UTC
[kafka] branch trunk updated: KAFKA-8410: Revert Part 1: processor
context bounds (#8414) (#8595)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fd095aa KAFKA-8410: Revert Part 1: processor context bounds (#8414) (#8595)
fd095aa is described below
commit fd095aaafdd207162cddf293b17f249875b9a532
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 1 14:26:36 2020 -0500
KAFKA-8410: Revert Part 1: processor context bounds (#8414) (#8595)
This reverts commit 29e08fd2c2d3349ba5cbd8fe5a9d35a0cea02b85.
There turned out to be more than expected problems with adding the generic parameters.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../examples/docs/DeveloperGuideTesting.java | 4 +-
.../apache/kafka/streams/kstream/Transformer.java | 2 +-
.../kafka/streams/kstream/ValueTransformer.java | 2 +-
.../streams/kstream/ValueTransformerWithKey.java | 2 +-
.../streams/kstream/internals/AbstractStream.java | 2 +-
.../kstream/internals/KStreamFlatTransform.java | 2 +-
.../internals/KStreamFlatTransformValues.java | 4 +-
.../internals/KStreamKTableJoinProcessor.java | 5 +-
.../kstream/internals/KStreamTransformValues.java | 4 +-
.../kstream/internals/KTableKTableInnerJoin.java | 7 ++-
.../kstream/internals/KTableKTableLeftJoin.java | 7 ++-
.../kstream/internals/KTableKTableOuterJoin.java | 7 ++-
.../kstream/internals/KTableKTableRightJoin.java | 7 ++-
.../streams/kstream/internals/KTableMapValues.java | 2 +-
.../kstream/internals/KTableRepartitionMap.java | 4 +-
.../kstream/internals/KTableTransformValues.java | 2 +-
.../kstream/internals/KTableValueGetter.java | 2 +-
.../internals/SessionCacheFlushListener.java | 10 ++--
.../kstream/internals/SessionTupleForwarder.java | 4 +-
.../internals/TimestampedCacheFlushListener.java | 10 ++--
.../internals/TimestampedTupleForwarder.java | 4 +-
.../internals/TransformerSupplierAdapter.java | 2 +-
.../ForeignJoinSubscriptionProcessorSupplier.java | 5 +-
.../SubscriptionJoinForeignProcessorSupplier.java | 5 +-
.../SubscriptionStoreReceiveProcessorSupplier.java | 5 +-
.../suppress/KTableSuppressProcessorSupplier.java | 6 +--
.../kafka/streams/processor/AbstractProcessor.java | 6 +--
.../apache/kafka/streams/processor/Processor.java | 2 +-
.../kafka/streams/processor/ProcessorContext.java | 16 +++---
.../internals/AbstractProcessorContext.java | 2 +-
.../ForwardingDisabledProcessorContext.java | 16 +++---
.../internals/GlobalProcessorContextImpl.java | 10 ++--
.../processor/internals/GlobalStateUpdateTask.java | 4 +-
.../processor/internals/GlobalStreamThread.java | 12 ++---
.../internals/InternalProcessorContext.java | 2 +-
.../processor/internals/ProcessorContextImpl.java | 23 +++++---
.../streams/processor/internals/ProcessorNode.java | 2 +-
.../streams/processor/internals/SourceNode.java | 2 +-
.../processor/internals/StandbyContextImpl.java | 16 +++---
.../streams/processor/internals/StreamTask.java | 8 +--
.../kafka/streams/TopologyTestDriverWrapper.java | 8 +--
.../internals/KStreamFlatTransformTest.java | 2 +-
.../internals/KStreamFlatTransformValuesTest.java | 2 +-
.../kstream/internals/KStreamPrintTest.java | 2 +-
...KStreamSessionWindowAggregateProcessorTest.java | 3 +-
.../kstream/internals/KStreamTransformTest.java | 4 +-
.../kstream/internals/KTableFilterTest.java | 5 +-
.../kstream/internals/KTableMapValuesTest.java | 5 +-
.../kstream/internals/KTableSourceTest.java | 3 +-
.../internals/KTableTransformValuesTest.java | 6 +--
.../internals/SessionCacheFlushListenerTest.java | 2 +-
.../internals/SessionTupleForwarderTest.java | 4 +-
.../TimestampedCacheFlushListenerTest.java | 4 +-
.../internals/TimestampedTupleForwarderTest.java | 4 +-
.../internals/AbstractProcessorContextTest.java | 2 +-
.../ForwardingDisabledProcessorContextTest.java | 61 ++++++++++++++++++++++
.../internals/GlobalProcessorContextImplTest.java | 4 +-
.../internals/ProcessorContextImplTest.java | 8 +--
.../processor/internals/StreamThreadTest.java | 2 +-
.../streams/state/KeyValueStoreTestDriver.java | 2 +-
.../state/internals/AbstractKeyValueStoreTest.java | 2 +-
.../state/internals/CachingKeyValueStoreTest.java | 2 +-
.../internals/InMemoryKeyValueLoggedStoreTest.java | 2 +-
.../state/internals/InMemoryKeyValueStoreTest.java | 2 +-
.../state/internals/InMemoryLRUCacheStoreTest.java | 2 +-
.../state/internals/RocksDBKeyValueStoreTest.java | 2 +-
.../apache/kafka/streams/tests/SmokeTestUtil.java | 2 +-
.../kafka/test/InternalMockProcessorContext.java | 2 +-
.../kafka/test/MockInternalProcessorContext.java | 10 ++--
.../java/org/apache/kafka/test/MockProcessor.java | 2 +-
.../org/apache/kafka/test/MockProcessorNode.java | 2 +-
.../apache/kafka/test/NoOpProcessorContext.java | 2 +-
.../streams/scala/FunctionsCompatConversions.scala | 6 +--
.../apache/kafka/streams/scala/TopologyTest.scala | 4 +-
.../kafka/streams/scala/kstream/KStreamTest.scala | 8 +--
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
.../streams/processor/MockProcessorContext.java | 2 +-
.../kafka/streams/TopologyTestDriverTest.java | 8 +--
78 files changed, 242 insertions(+), 192 deletions(-)
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
index f962f08..28fccfa 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
@@ -153,12 +153,12 @@ public class DeveloperGuideTesting {
}
public static class CustomMaxAggregator implements Processor<String, Long> {
- ProcessorContext<Object, Object> context;
+ ProcessorContext context;
private KeyValueStore<String, Long> store;
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
this.context = context;
context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 9015d38..af8e87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -64,7 +64,7 @@ public interface Transformer<K, V, R> {
*
* @param context the context
*/
- void init(final ProcessorContext<Object, Object> context);
+ void init(final ProcessorContext context);
/**
* Transform the record with the given key and value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index aae53be..987cae5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -69,7 +69,7 @@ public interface ValueTransformer<V, VR> {
* @throws IllegalStateException If store gets registered after initialization is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- void init(final ProcessorContext<Void, Void> context);
+ void init(final ProcessorContext context);
/**
* Transform the given value to a new value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 55d61dd..be37b0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -72,7 +72,7 @@ public interface ValueTransformerWithKey<K, V, VR> {
* @throws IllegalStateException If store gets registered after initialization is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- void init(final ProcessorContext<Void, Void> context);
+ void init(final ProcessorContext context);
/**
* Transform the given [key and ]value to a new value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 68a254a..3c6b591 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -111,7 +111,7 @@ public abstract class AbstractStream<K, V> {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
valueTransformer.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
index 097db8a..10ef15f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
@@ -46,7 +46,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
transformer.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
index bb5a651..40e4b37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
@@ -39,14 +39,14 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupp
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
this.valueTransformer = valueTransformer;
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index d93674b..d0ce634 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,11 +49,11 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
- valueGetter.init(new ForwardingDisabledProcessorContext(context));
+ valueGetter.init(context);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index dbe3048f..843606b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -39,14 +39,14 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
private final ValueTransformerWithKey<K, V, R> valueTransformer;
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) {
this.valueTransformer = valueTransformer;
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 7df2029..06701c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@@ -76,11 +75,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
- valueGetter.init(new ForwardingDisabledProcessorContext(context));
+ valueGetter.init(context);
}
@Override
@@ -136,7 +135,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 420055c..b6fd894 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@@ -75,11 +74,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
- valueGetter.init(new ForwardingDisabledProcessorContext(context));
+ valueGetter.init(context);
}
@Override
@@ -142,7 +141,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 3790107..391255d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@@ -74,11 +73,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
- valueGetter.init(new ForwardingDisabledProcessorContext(context));
+ valueGetter.init(context);
}
@Override
@@ -137,7 +136,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 9c93dc7..f499c2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@@ -73,11 +72,11 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
- valueGetter.init(new ForwardingDisabledProcessorContext(context));
+ valueGetter.init(context);
}
@Override
@@ -133,7 +132,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index b58c6ee..e734457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -135,7 +135,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
parentGetter.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index c1dedbd..a3d8b79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -104,14 +104,14 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
private final KTableValueGetter<K, V> parentGetter;
- private ProcessorContext<Void, Void> context;
+ private ProcessorContext context;
KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
this.context = context;
parentGetter.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index decf96e..86da063 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -134,7 +134,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
- public void init(final ProcessorContext<Void, Void> context) {
+ public void init(final ProcessorContext context) {
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index 6479690..a2695d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
public interface KTableValueGetter<K, V> {
- void init(ProcessorContext<Void, Void> context);
+ void init(ProcessorContext context);
ValueAndTimestamp<V> get(K key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index 3fe82c5..f40fdfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -24,11 +24,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
- private final InternalProcessorContext<Object, Object> context;
- private final ProcessorNode<?, ?> myNode;
+ private final InternalProcessorContext context;
+ private final ProcessorNode myNode;
- SessionCacheFlushListener(final ProcessorContext<Object, Object> context) {
- this.context = (InternalProcessorContext<Object, Object>) context;
+ SessionCacheFlushListener(final ProcessorContext context) {
+ this.context = (InternalProcessorContext) context;
myNode = this.context.currentNode();
}
@@ -37,7 +37,7 @@ class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>,
final V newValue,
final V oldValue,
final long timestamp) {
- final ProcessorNode<?, ?> prev = context.currentNode();
+ final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
index ee49979..bad255a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
@@ -32,13 +32,13 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
* @param <V>
*/
class SessionTupleForwarder<K, V> {
- private final ProcessorContext<Object, Object> context;
+ private final ProcessorContext context;
private final boolean sendOldValues;
private final boolean cachingEnabled;
@SuppressWarnings("unchecked")
SessionTupleForwarder(final StateStore store,
- final ProcessorContext<Object, Object> context,
+ final ProcessorContext context,
final CacheFlushListener<Windowed<K>, V> flushListener,
final boolean sendOldValues) {
this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
index ec9135e..5540376 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -26,11 +26,11 @@ import org.apache.kafka.streams.state.internals.CacheFlushListener;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
- private final InternalProcessorContext<Object, Object> context;
- private final ProcessorNode<?, ?> myNode;
+ private final InternalProcessorContext context;
+ private final ProcessorNode myNode;
- TimestampedCacheFlushListener(final ProcessorContext<Object, Object> context) {
- this.context = (InternalProcessorContext<Object, Object>) context;
+ TimestampedCacheFlushListener(final ProcessorContext context) {
+ this.context = (InternalProcessorContext) context;
myNode = this.context.currentNode();
}
@@ -39,7 +39,7 @@ class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, Value
final ValueAndTimestamp<V> newValue,
final ValueAndTimestamp<V> oldValue,
final long timestamp) {
- final ProcessorNode<?, ?> prev = context.currentNode();
+ final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 979798e..910dd8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -30,13 +30,13 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
* @param <V> the type of the value
*/
class TimestampedTupleForwarder<K, V> {
- private final ProcessorContext<Object, Object> context;
+ private final ProcessorContext context;
private final boolean sendOldValues;
private final boolean cachingEnabled;
@SuppressWarnings("unchecked")
TimestampedTupleForwarder(final StateStore store,
- final ProcessorContext<Object, Object> context,
+ final ProcessorContext context,
final TimestampedCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.context = context;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java
index 0d828d3..7d0bf7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapter.java
@@ -38,7 +38,7 @@ public class TransformerSupplierAdapter<KIn, VIn, KOut, VOut> implements Transfo
private Transformer<KIn, VIn, KeyValue<KOut, VOut>> transformer = transformerSupplier.get();
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
transformer.init(context);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index 975c1df..fd95105 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -61,10 +61,9 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
- final InternalProcessorContext<Object, Object> internalProcessorContext =
- (InternalProcessorContext<Object, Object>) context;
+ final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
index e6c5347..2544eb1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Objects;
@@ -57,10 +56,10 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
private KTableValueGetter<KO, VO> foreignValues;
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
foreignValues = foreignValueGetterSupplier.get();
- foreignValues.init(new ForwardingDisabledProcessorContext(context));
+ foreignValues.init(context);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
index 344311b..61fb1c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java
@@ -58,10 +58,9 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
private Sensor droppedRecordsSensor;
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
- final InternalProcessorContext<Object, Object> internalProcessorContext =
- (InternalProcessorContext<Object, Object>) context;
+ final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 3ad4bc1..61f013c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -121,7 +121,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
private final String storeName;
private TimeOrderedKeyValueBuffer<K, V> buffer;
- private InternalProcessorContext<Object, Object> internalProcessorContext;
+ private InternalProcessorContext internalProcessorContext;
private Sensor suppressionEmitSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
@@ -138,8 +138,8 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext<Object, Object> context) {
- internalProcessorContext = (InternalProcessorContext<Object, Object>) context;
+ public void init(final ProcessorContext context) {
+ internalProcessorContext = (InternalProcessorContext) context;
suppressionEmitSensor = ProcessorNodeMetrics.suppressionEmitSensor(
Thread.currentThread().getName(),
context.taskId().toString(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index fa187a8..83abfca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -25,13 +25,13 @@ package org.apache.kafka.streams.processor;
*/
public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
protected AbstractProcessor() {
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
this.context = context;
}
@@ -51,7 +51,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
*
* @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
*/
- protected final ProcessorContext<Object, Object> context() {
+ protected final ProcessorContext context() {
return context;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index bd0814d..4046f2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -37,7 +37,7 @@ public interface Processor<K, V> {
*
* @param context the context; may not be null
*/
- void init(ProcessorContext<Object, Object> context);
+ void init(ProcessorContext context);
/**
* Process the record with the given key and value.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 50ab83c..1971c67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -27,11 +27,8 @@ import java.util.Map;
/**
* Processor context interface.
- *
- * @param <K> the type of input keys that can be forwarded
- * @param <V> the type of input values that can be forwarded
*/
-public interface ProcessorContext<K, V> {
+public interface ProcessorContext {
/**
* Returns the application id
@@ -161,12 +158,11 @@ public interface ProcessorContext<K, V> {
* @param interval the time interval between punctuations (supported minimum is 1 millisecond)
* @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
* @param callback a function consuming timestamps representing the current stream or system time
- * @throws IllegalArgumentException if the interval is under 1 millisecond
* @return a handle allowing cancellation of the punctuation schedule established by this method
*/
Cancellable schedule(final Duration interval,
final PunctuationType type,
- final Punctuator callback);
+ final Punctuator callback) throws IllegalArgumentException;
/**
* Forwards a key/value pair to all downstream processors.
@@ -175,7 +171,7 @@ public interface ProcessorContext<K, V> {
* @param key key
* @param value value
*/
- <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value);
+ <K, V> void forward(final K key, final V value);
/**
* Forwards a key/value pair to the specified downstream processors.
@@ -185,7 +181,7 @@ public interface ProcessorContext<K, V> {
* @param value value
* @param to the options to use when forwarding
*/
- <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to);
+ <K, V> void forward(final K key, final V value, final To to);
/**
* Forwards a key/value pair to one of the downstream processors designated by childIndex
@@ -196,7 +192,7 @@ public interface ProcessorContext<K, V> {
*/
// TODO when we remove this method, we can also remove `ProcessorNode#children`
@Deprecated
- <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex);
+ <K, V> void forward(final K key, final V value, final int childIndex);
/**
* Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
@@ -206,7 +202,7 @@ public interface ProcessorContext<K, V> {
* @deprecated please use {@link #forward(Object, Object, To)} instead
*/
@Deprecated
- <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
+ <K, V> void forward(final K key, final V value, final String childName);
/**
* Requests a commit
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 70831ed..71dd68a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -31,7 +31,7 @@ import java.util.Map;
import java.util.Objects;
-public abstract class AbstractProcessorContext<K, V> implements InternalProcessorContext<K, V> {
+public abstract class AbstractProcessorContext implements InternalProcessorContext {
public static final String NONEXIST_TOPIC = "__null_topic__";
private final TaskId taskId;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 4b2aa5f..ba39368 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -37,10 +37,10 @@ import java.util.Objects;
/**
* {@code ProcessorContext} implementation that will throw on any forward call.
*/
-public final class ForwardingDisabledProcessorContext implements ProcessorContext<Void, Void> {
- private final ProcessorContext<?, ?> delegate;
+public final class ForwardingDisabledProcessorContext implements ProcessorContext {
+ private final ProcessorContext delegate;
- public ForwardingDisabledProcessorContext(final ProcessorContext<?, ?> delegate) {
+ public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}
@@ -96,29 +96,29 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
- final Punctuator callback) {
+ final Punctuator callback) throws IllegalArgumentException {
return delegate.schedule(interval, type, callback);
}
@Override
- public void forward(final Void key, final Void value) {
+ public <K, V> void forward(final K key, final V value) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@Override
- public void forward(final Void key, final Void value, final To to) {
+ public <K, V> void forward(final K key, final V value, final To to) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@Override
@Deprecated
- public void forward(final Void key, final Void value, final int childIndex) {
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@Override
@Deprecated
- public void forward(final Void key, final Void value, final String childName) {
+ public <K, V> void forward(final K key, final V value, final String childName) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index ff5c26a..859430c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
-public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> {
+public class GlobalProcessorContextImpl extends AbstractProcessorContext {
public GlobalProcessorContextImpl(final StreamsConfig config,
@@ -69,7 +69,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
@SuppressWarnings("unchecked")
@Override
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value) {
+ public <K, V> void forward(final K key, final V value) {
final ProcessorNode<?, ?> previousNode = currentNode();
try {
for (final ProcessorNode<?, ?> child : currentNode().children()) {
@@ -85,7 +85,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
* No-op. This should only be called on GlobalStateStore#flush and there should be no child nodes
*/
@Override
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to) {
+ public <K, V> void forward(final K key, final V value, final To to) {
if (!currentNode().children().isEmpty()) {
throw new IllegalStateException("This method should only be called on 'GlobalStateStore.flush' that should not have any children.");
}
@@ -96,7 +96,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
*/
@Override
@Deprecated
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex) {
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
}
@@ -105,7 +105,7 @@ public class GlobalProcessorContextImpl<K, V> extends AbstractProcessorContext<K
*/
@Override
@Deprecated
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName) {
+ public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index dcb3335..ddef7a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -35,7 +35,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
public class GlobalStateUpdateTask implements GlobalStateMaintainer {
private final ProcessorTopology topology;
- private final InternalProcessorContext<Object, Object> processorContext;
+ private final InternalProcessorContext processorContext;
private final Map<TopicPartition, Long> offsets = new HashMap<>();
private final Map<String, RecordDeserializer> deserializers = new HashMap<>();
private final GlobalStateManager stateMgr;
@@ -43,7 +43,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
private final LogContext logContext;
public GlobalStateUpdateTask(final ProcessorTopology topology,
- final InternalProcessorContext<Object, Object> processorContext,
+ final InternalProcessorContext processorContext,
final GlobalStateManager stateMgr,
final DeserializationExceptionHandler deserializationExceptionHandler,
final LogContext logContext) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index c681825..923480f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -320,13 +320,11 @@ public class GlobalStreamThread extends Thread {
stateRestoreListener,
config);
- final GlobalProcessorContextImpl<Object, Object> globalProcessorContext =
- new GlobalProcessorContextImpl<>(
- config,
- stateMgr,
- streamsMetrics,
- cache
- );
+ final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
+ config,
+ stateMgr,
+ streamsMetrics,
+ cache);
stateMgr.setGlobalProcessorContext(globalProcessorContext);
final StateConsumer stateConsumer = new StateConsumer(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index d25ce30..3e00ab2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
* {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
* {@link ThreadCache}
*/
-public interface InternalProcessorContext<K, V> extends ProcessorContext<K, V> {
+public interface InternalProcessorContext extends ProcessorContext {
@Override
StreamsMetricsImpl metrics();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 0e1b07a..d390af5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -45,7 +45,7 @@ import java.util.List;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
-public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> implements RecordCollector.Supplier {
+public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
private final StreamTask task;
private final RecordCollector collector;
@@ -126,13 +126,16 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
}
@Override
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value) {
+ public <K, V> void forward(final K key,
+ final V value) {
forward(key, value, SEND_TO_ALL);
}
@Override
@Deprecated
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex) {
+ public <K, V> void forward(final K key,
+ final V value,
+ final int childIndex) {
forward(
key,
value,
@@ -141,13 +144,17 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
@Override
@Deprecated
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName) {
+ public <K, V> void forward(final K key,
+ final V value,
+ final String childName) {
forward(key, value, To.child(childName));
}
@SuppressWarnings("unchecked")
@Override
- public <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to) {
+ public <K, V> void forward(final K key,
+ final V value,
+ final To to) {
final ProcessorNode<?, ?> previousNode = currentNode();
final ProcessorRecordContext previousContext = recordContext;
@@ -182,7 +189,9 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
}
}
- private <K1 extends K, V1 extends V> void forward(final ProcessorNode<K1, V1> child, final K1 key, final V1 value) {
+ private <K, V> void forward(final ProcessorNode<K, V> child,
+ final K key,
+ final V value) {
setCurrentNode(child);
child.process(key, value);
}
@@ -207,7 +216,7 @@ public class ProcessorContextImpl<K, V> extends AbstractProcessorContext<K, V> i
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
- final Punctuator callback) {
+ final Punctuator callback) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8b72ee3..f37ff2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -87,7 +87,7 @@ public class ProcessorNode<K, V> {
childByName.put(child.name, child);
}
- public void init(final InternalProcessorContext<Object, Object> context) {
+ public void init(final InternalProcessorContext context) {
try {
internalProcessorContext = context;
initSensors();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index c43afab..853520a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -30,7 +30,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
private final List<String> topics;
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
private Deserializer<K> keyDeserializer;
private Deserializer<V> valDeserializer;
private final TimestampExtractor timestampExtractor;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index f866521..9a94ad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
-class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements RecordCollector.Supplier {
+class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
StandbyContextImpl(final TaskId id,
final StreamsConfig config,
@@ -104,7 +104,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
* @throws UnsupportedOperationException on every invocation
*/
@Override
- public void forward(final Void key, final Void value) {
+ public <K, V> void forward(final K key, final V value) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -112,7 +112,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
* @throws UnsupportedOperationException on every invocation
*/
@Override
- public void forward(final Void key, final Void value, final To to) {
+ public <K, V> void forward(final K key, final V value, final To to) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -121,7 +121,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
*/
@Override
@Deprecated
- public void forward(final Void key, final Void value, final int childIndex) {
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -130,7 +130,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
*/
@Override
@Deprecated
- public void forward(final Void key, final Void value, final String childName) {
+ public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -155,7 +155,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
* @throws UnsupportedOperationException on every invocation
*/
@Override
- public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+ public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
}
@@ -176,7 +176,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
}
@Override
- public void setCurrentNode(final ProcessorNode<?, ?> currentNode) {
+ public void setCurrentNode(final ProcessorNode currentNode) {
// no-op. can't throw as this is called on commit when the StateStores get flushed.
}
@@ -184,7 +184,7 @@ class StandbyContextImpl extends AbstractProcessorContext<Void, Void> implements
* @throws UnsupportedOperationException on every invocation
*/
@Override
- public ProcessorNode<?, ?> currentNode() {
+ public ProcessorNode currentNode() {
throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 51604e7..1d513a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -97,7 +97,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
private final Sensor enforcedProcessingSensor;
- private final InternalProcessorContext<Object, Object> processorContext;
+ private final InternalProcessorContext processorContext;
private long idleStartTimeMs;
private boolean commitNeeded = false;
@@ -153,7 +153,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
// initialize the topology with its own context
- processorContext = new ProcessorContextImpl<>(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache);
+ processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache);
final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
@@ -936,7 +936,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
- public ProcessorContext<Object, Object> context() {
+ public ProcessorContext context() {
return processorContext;
}
@@ -1007,7 +1007,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return recordCollector;
}
- InternalProcessorContext<Object, Object> processorContext() {
+ InternalProcessorContext processorContext() {
return processorContext;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
index 993a2ce..5c73bb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import java.util.Properties;
@@ -42,9 +42,9 @@ public class TopologyTestDriverWrapper extends TopologyTestDriver {
* @param processorName processor name to set as current node
* @return the processor context
*/
- public ProcessorContext<Object, Object> setCurrentNodeForProcessorContext(final String processorName) {
- final ProcessorContext<Object, Object> context = task.context();
- ((InternalProcessorContext<Object, Object>) context).setCurrentNode(getProcessor(processorName));
+ public ProcessorContext setCurrentNodeForProcessorContext(final String processorName) {
+ final ProcessorContext context = task.context();
+ ((ProcessorContextImpl) context).setCurrentNode(getProcessor(processorName));
return context;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 54baf2c..d18a7a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -38,7 +38,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
private Number inputValue;
private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index 2b8d44f..36167c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -38,7 +38,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
private Integer inputValue;
private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 07f43be..9906556 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -45,7 +45,7 @@ public class KStreamPrintTest {
"test-stream"));
printProcessor = kStreamPrint.get();
- final ProcessorContext<Object, Object> processorContext = EasyMock.createNiceMock(ProcessorContext.class);
+ final ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
EasyMock.replay(processorContext);
printProcessor.init(processorContext);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 9bdae4e..d3fe80a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.To;
@@ -299,7 +298,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldGetAggregatedValuesFromValueGetter() {
final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
- getter.init(new ForwardingDisabledProcessorContext(context));
+ getter.init(context);
context.setTime(0);
processor.process("a", "1");
context.setTime(GAP_MS + 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 521ad5e..1f37b3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -56,7 +56,7 @@ public class KStreamTransformTest {
private int total = 0;
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
context.schedule(
Duration.ofMillis(1),
PunctuationType.WALL_CLOCK_TIME,
@@ -122,7 +122,7 @@ public class KStreamTransformTest {
private int total = 0;
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
context.schedule(
Duration.ofMillis(1),
PunctuationType.WALL_CLOCK_TIME,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index eaf6f77..f4c1854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.TestInputTopic;
@@ -151,8 +150,8 @@ public class KTableFilterTest {
final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter2.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table2.name)));
- getter3.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table3.name)));
+ getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+ getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
inputTopic.pipeInput("A", 1, 5L);
inputTopic.pipeInput("B", 1, 10L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 346fded..28b0342 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -122,8 +121,8 @@ public class KTableMapValuesTest {
final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter2.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table2.name)));
- getter3.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table3.name)));
+ getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+ getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
inputTopic1.pipeInput("A", "01", 50L);
inputTopic1.pipeInput("B", "01", 10L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index fc6ac7a..236ea58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -189,7 +188,7 @@ public class KTableSourceTest {
Duration.ZERO
);
final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(new ForwardingDisabledProcessorContext(driver.setCurrentNodeForProcessorContext(table1.name)));
+ getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
inputTopic1.pipeInput("A", "01", 10L);
inputTopic1.pipeInput("B", "01", 20L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index c850e7f..80cdae4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -84,7 +84,7 @@ public class KTableTransformValuesTest {
@Mock(MockType.NICE)
private KTableImpl<String, String, String> parent;
@Mock(MockType.NICE)
- private InternalProcessorContext<Object, Object> context;
+ private InternalProcessorContext context;
@Mock(MockType.NICE)
private KTableValueGetterSupplier<String, String> parentGetterSupplier;
@Mock(MockType.NICE)
@@ -207,7 +207,7 @@ public class KTableTransformValuesTest {
replay(parent, parentGetterSupplier, parentGetter);
final KTableValueGetter<String, String> getter = transformValues.view().get();
- getter.init(new ForwardingDisabledProcessorContext(context));
+ getter.init(context);
final String result = getter.get("Key").value();
@@ -224,7 +224,7 @@ public class KTableTransformValuesTest {
replay(context, stateStore);
final KTableValueGetter<String, String> getter = transformValues.view().get();
- getter.init(new ForwardingDisabledProcessorContext(context));
+ getter.init(context);
final String result = getter.get("Key").value();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index 1a1f413..b25febf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -30,7 +30,7 @@ import static org.easymock.EasyMock.verify;
public class SessionCacheFlushListenerTest {
@Test
public void shouldForwardKeyNewValueOldValueAndTimestamp() {
- final InternalProcessorContext<Object, Object> context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
index b2a4a54..e99c684 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
@@ -57,7 +57,7 @@ public class SessionTupleForwarderTest {
private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+ final ProcessorContext context = mock(ProcessorContext.class);
expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
if (sendOldValued) {
@@ -83,7 +83,7 @@ public class SessionTupleForwarderTest {
@Test
public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+ final ProcessorContext context = mock(ProcessorContext.class);
expect(store.setFlushListener(null, false)).andReturn(true);
replay(store, context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 90123cc..38ef5c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -31,7 +31,7 @@ public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardValueTimestampIfNewValueExists() {
- final InternalProcessorContext<Object, Object> context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
@@ -53,7 +53,7 @@ public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardParameterTimestampIfNewValueIsNull() {
- final InternalProcessorContext<Object, Object> context = mock(InternalProcessorContext.class);
+ final InternalProcessorContext context = mock(InternalProcessorContext.class);
expect(context.currentNode()).andReturn(null).anyTimes();
context.setCurrentNode(null);
context.setCurrentNode(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index e8f9e07..52a5fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -57,7 +57,7 @@ public class TimestampedTupleForwarderTest {
private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+ final ProcessorContext context = mock(ProcessorContext.class);
expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
if (sendOldValues) {
@@ -81,7 +81,7 @@ public class TimestampedTupleForwarderTest {
@Test
public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
- final ProcessorContext<Object, Object> context = mock(ProcessorContext.class);
+ final ProcessorContext context = mock(ProcessorContext.class);
expect(store.setFlushListener(null, false)).andReturn(true);
replay(store, context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 8002d09..4a3ee7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -184,7 +184,7 @@ public class AbstractProcessorContextTest {
}
- private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> {
+ private static class TestProcessorContext extends AbstractProcessorContext {
static Properties config;
static {
config = getStreamsConfig();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
new file mode 100644
index 0000000..c6b2cbe
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class ForwardingDisabledProcessorContextTest {
+ @Mock(MockType.NICE)
+ private ProcessorContext delegate;
+ private ForwardingDisabledProcessorContext context;
+
+ @Before
+ public void setUp() {
+ context = new ForwardingDisabledProcessorContext(delegate);
+ }
+
+ @Test(expected = StreamsException.class)
+ public void shouldThrowOnForward() {
+ context.forward("key", "value");
+ }
+
+ @Test(expected = StreamsException.class)
+ public void shouldThrowOnForwardWithTo() {
+ context.forward("key", "value", To.all());
+ }
+
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
+ @Test(expected = StreamsException.class)
+ public void shouldThrowOnForwardWithChildIndex() {
+ context.forward("key", "value", 1);
+ }
+
+ @SuppressWarnings("deprecation") // need to test deprecated code until removed
+ @Test(expected = StreamsException.class)
+ public void shouldThrowOnForwardWithChildName() {
+ context.forward("key", "value", "child1");
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 60a8a5e..8443e55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -51,7 +51,7 @@ public class GlobalProcessorContextImplTest {
private static final String UNKNOWN_STORE = "unknown-store";
private static final String CHILD_PROCESSOR = "child";
- private GlobalProcessorContextImpl<Object, Object> globalContext;
+ private GlobalProcessorContextImpl globalContext;
private ProcessorNode<?, ?> child;
private ProcessorRecordContext recordContext;
@@ -74,7 +74,7 @@ public class GlobalProcessorContextImplTest {
expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
replay(stateManager);
- globalContext = new GlobalProcessorContextImpl<>(
+ globalContext = new GlobalProcessorContextImpl(
streamsConfig,
stateManager,
null,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 9255053..5b52e9d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -55,7 +55,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ProcessorContextImplTest {
- private ProcessorContextImpl<Object, Object> context;
+ private ProcessorContextImpl context;
private static final String KEY = "key";
private static final long VALUE = 42L;
@@ -121,7 +121,7 @@ public class ProcessorContextImplTest {
replay(stateManager);
- context = new ProcessorContextImpl<>(
+ context = new ProcessorContextImpl(
mock(TaskId.class),
mock(StreamTask.class),
streamsConfig,
@@ -524,10 +524,10 @@ public class ProcessorContextImplTest {
}
private <T extends StateStore> void doTest(final String name, final Consumer<T> checker) {
- final Processor<String, Long> processor = new Processor<String, Long>() {
+ final Processor processor = new Processor<String, Long>() {
@Override
@SuppressWarnings("unchecked")
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
final T store = (T) context.getStateStore(name);
checker.accept(store);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index ff632eb..2fdba0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1481,7 +1481,7 @@ public class StreamThreadTest {
final List<Long> punctuatedWallClockTime = new ArrayList<>();
final ProcessorSupplier<Object, Object> punctuateProcessor = () -> new Processor<Object, Object>() {
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index fce8ce1..89740c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -331,7 +331,7 @@ public class KeyValueStoreTestDriver<K, V> {
* @return the processing context; never null
* @see #addEntryToRestoreLog(Object, Object)
*/
- public ProcessorContext<Object, Object> context() {
+ public ProcessorContext context() {
return context;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 3927d28..ca5fd2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -50,7 +50,7 @@ import static org.junit.Assert.fail;
public abstract class AbstractKeyValueStoreTest {
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context);
+ protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
protected InternalMockProcessorContext context;
protected KeyValueStore<Integer, String> store;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 92f3e99..ffc8134 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -87,7 +87,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("cache-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 330d53c..7c0d16c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -28,7 +28,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 2af4130..62f8949 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -31,7 +31,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 9aa100a..2a86cdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -36,7 +36,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.lruMap("my-store", 10),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5e79051..504aa9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -38,7 +38,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext<Object, Object> context) {
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 441448c..990bfdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -48,7 +48,7 @@ public class SmokeTestUtil {
private long largestOffset = Long.MIN_VALUE;
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index cde5735..c5aedb5 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -54,7 +54,7 @@ import java.util.Map;
import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
public class InternalMockProcessorContext
- extends AbstractProcessorContext<Object, Object>
+ extends AbstractProcessorContext
implements RecordCollector.Supplier {
private final File stateDir;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 6817504..38b4160 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -32,12 +32,10 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
-public class MockInternalProcessorContext
- extends MockProcessorContext
- implements InternalProcessorContext<Object, Object> {
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
- private ProcessorNode<?, ?> currentNode;
+ private ProcessorNode currentNode;
private RecordCollector recordCollector;
public MockInternalProcessorContext() {
@@ -69,12 +67,12 @@ public class MockInternalProcessorContext
}
@Override
- public void setCurrentNode(final ProcessorNode<?, ?> currentNode) {
+ public void setCurrentNode(final ProcessorNode currentNode) {
this.currentNode = currentNode;
}
@Override
- public ProcessorNode<?, ?> currentNode() {
+ public ProcessorNode currentNode() {
return currentNode;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 70b93ea..8e3ce7d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -56,7 +56,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
super.init(context);
if (scheduleInterval > 0L) {
scheduleCancellable = context.schedule(
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 60f6045..718e5af 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -52,7 +52,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
}
@Override
- public void init(final InternalProcessorContext<Object, Object> context) {
+ public void init(final InternalProcessorContext context) {
super.init(context);
initialized = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index d494f10..77dd418 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -33,7 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-public class NoOpProcessorContext extends AbstractProcessorContext<Object, Object> {
+public class NoOpProcessorContext extends AbstractProcessorContext {
public boolean initialized;
@SuppressWarnings("WeakerAccess")
public Map<Object, Object> forwardedValues = new HashMap<>();
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
index fdce551..c3c6403 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
@@ -97,7 +97,7 @@ private[scala] object FunctionsCompatConversions {
val innerTransformer = supplier.get()
new Transformer[K, V, JIterable[VO]] {
override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava
- override def init(context: ProcessorContext[Object, Object]): Unit = innerTransformer.init(context)
+ override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
override def close(): Unit = innerTransformer.close()
}
}
@@ -108,7 +108,7 @@ private[scala] object FunctionsCompatConversions {
val innerTransformer = supplier.get()
new ValueTransformer[V, JIterable[VO]] {
override def transform(value: V): JIterable[VO] = innerTransformer.transform(value).asJava
- override def init(context: ProcessorContext[Void, Void]): Unit = innerTransformer.init(context)
+ override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
override def close(): Unit = innerTransformer.close()
}
}
@@ -120,7 +120,7 @@ private[scala] object FunctionsCompatConversions {
val innerTransformer = supplier.get()
new ValueTransformerWithKey[K, V, JIterable[VO]] {
override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava
- override def init(context: ProcessorContext[Void, Void]): Unit = innerTransformer.init(context)
+ override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
override def close(): Unit = innerTransformer.close()
}
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 616ee5d..3107db6 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -291,7 +291,7 @@ class TopologyTest {
.transform(
() =>
new Transformer[String, String, KeyValue[String, String]] {
- override def init(context: ProcessorContext[Object, Object]): Unit = ()
+ override def init(context: ProcessorContext): Unit = ()
override def transform(key: String, value: String): KeyValue[String, String] =
new KeyValue(key, value.toLowerCase)
override def close(): Unit = ()
@@ -312,7 +312,7 @@ class TopologyTest {
val lowered: KStreamJ[String, String] = textLines.transform(
() =>
new Transformer[String, String, KeyValue[String, String]] {
- override def init(context: ProcessorContext[Object, Object]): Unit = ()
+ override def init(context: ProcessorContext): Unit = ()
override def transform(key: String, value: String): KeyValue[String, String] =
new KeyValue(key, value.toLowerCase)
override def close(): Unit = ()
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 66bf313..e5a0aad 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -185,7 +185,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
"transform a KStream" should "transform correctly records" in {
class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
- override def init(context: ProcessorContext[Object, Object]): Unit = {}
+ override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): KeyValue[String, String] =
new KeyValue(s"$key-transformed", s"$value-transformed")
override def close(): Unit = {}
@@ -217,7 +217,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
"flatTransform a KStream" should "flatTransform correctly records" in {
class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
- override def init(context: ProcessorContext[Object, Object]): Unit = {}
+ override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): Iterable[KeyValue[String, String]] =
Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
override def close(): Unit = {}
@@ -249,7 +249,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
"flatTransformValues a KStream" should "correctly flatTransform values in records" in {
class TestTransformer extends ValueTransformer[String, Iterable[String]] {
- override def init(context: ProcessorContext[Void, Void]): Unit = {}
+ override def init(context: ProcessorContext): Unit = {}
override def transform(value: String): Iterable[String] =
Array(s"$value-transformed")
override def close(): Unit = {}
@@ -282,7 +282,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
"flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in {
class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
- override def init(context: ProcessorContext[Void, Void]): Unit = {}
+ override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): Iterable[String] =
Array(s"$value-transformed-$key")
override def close(): Unit = {}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 0a5a560..bfcfb87 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -425,8 +425,8 @@ public class TopologyTestDriver implements Closeable {
streamsConfig
);
- final GlobalProcessorContextImpl<Object, Object> globalProcessorContext =
- new GlobalProcessorContextImpl<>(streamsConfig, globalStateManager, streamsMetrics, cache);
+ final GlobalProcessorContextImpl globalProcessorContext =
+ new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
globalStateManager.setGlobalProcessorContext(globalProcessorContext);
globalStateTask = new GlobalStateUpdateTask(
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index fbca146..73da6ef 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -55,7 +55,7 @@ import java.util.Properties;
* If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
* {@link Topology} and using the {@link TopologyTestDriver}.
*/
-public class MockProcessorContext implements ProcessorContext<Object, Object>, RecordCollector.Supplier {
+public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
// Immutable fields ================================================
private final StreamsMetricsImpl metrics;
private final TaskId taskId;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index ee6853f..fb5585f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -236,7 +236,7 @@ public class TopologyTestDriverTest {
private final static class MockProcessor implements Processor<Object, Object> {
private final Collection<Punctuation> punctuations;
- private ProcessorContext<Object, Object> context;
+ private ProcessorContext context;
private boolean initialized = false;
private boolean closed = false;
@@ -247,7 +247,7 @@ public class TopologyTestDriverTest {
}
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
initialized = true;
this.context = context;
for (final Punctuation punctuation : punctuations) {
@@ -1284,12 +1284,12 @@ public class TopologyTestDriverTest {
}
private static class CustomMaxAggregator implements Processor<String, Long> {
- ProcessorContext<Object, Object> context;
+ ProcessorContext context;
private KeyValueStore<String, Long> store;
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext<Object, Object> context) {
+ public void init(final ProcessorContext context) {
this.context = context;
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushStore());
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, timestamp -> flushStore());