You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/08/10 02:09:36 UTC
[kafka] branch trunk updated: KAFKA-10261: Introduce the KIP-478
apis with adapters (#9004)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92828d5 KAFKA-10261: Introduce the KIP-478 apis with adapters (#9004)
92828d5 is described below
commit 92828d53b18703000159f4dd7dc8b3170667db25
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sun Aug 9 21:05:25 2020 -0500
KAFKA-10261: Introduce the KIP-478 apis with adapters (#9004)
Adds the new Processor and ProcessorContext interfaces
as proposed in KIP-478. To integrate in a staged fashion
with the code base, adapters are included to convert back
and forth between the new and old APIs.
ProcessorNode is converted to the new APIs.
Reviewers: Boyang Chen <bo...@confluent.io>
---
.../kafka/streams/processor/api/Processor.java | 64 ++++++
.../streams/processor/api/ProcessorContext.java | 239 ++++++++++++++++++++
.../streams/processor/api/ProcessorSupplier.java | 43 ++++
.../internals/AbstractProcessorContext.java | 6 +-
.../internals/GlobalProcessorContextImpl.java | 8 +-
.../processor/internals/GlobalStateUpdateTask.java | 6 +-
...ntext.java => InternalApiProcessorContext.java} | 14 +-
.../internals/InternalProcessorContext.java | 6 +-
.../internals/InternalTopologyBuilder.java | 171 +++++++-------
.../processor/internals/PartitionGroup.java | 2 +-
.../processor/internals/ProcessorAdapter.java | 53 +++++
.../internals/ProcessorContextAdapter.java | 234 +++++++++++++++++++
.../processor/internals/ProcessorContextImpl.java | 14 +-
.../internals/ProcessorContextReverseAdapter.java | 248 +++++++++++++++++++++
.../streams/processor/internals/ProcessorNode.java | 27 ++-
.../internals/ProcessorNodePunctuator.java | 2 +-
.../processor/internals/ProcessorTopology.java | 44 ++--
.../processor/internals/RecordDeserializer.java | 6 +-
.../streams/processor/internals/RecordQueue.java | 6 +-
.../streams/processor/internals/SinkNode.java | 26 +--
.../streams/processor/internals/SourceNode.java | 24 +-
.../streams/processor/internals/StreamTask.java | 14 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 2 +-
.../kafka/streams/TopologyTestDriverWrapper.java | 6 +-
.../streams/kstream/internals/KStreamImplTest.java | 8 +-
.../internals/GlobalProcessorContextImplTest.java | 19 +-
.../processor/internals/GlobalStateTaskTest.java | 10 +-
.../internals/InternalTopologyBuilderTest.java | 4 +-
.../internals/ProcessorContextImplTest.java | 2 +-
.../processor/internals/ProcessorNodeTest.java | 4 +-
.../internals/ProcessorTopologyFactories.java | 4 +-
.../processor/internals/PunctuationQueueTest.java | 2 +-
.../internals/RecordDeserializerTest.java | 2 +-
.../processor/internals/RecordQueueTest.java | 2 +-
.../streams/processor/internals/SinkNodeTest.java | 4 +-
.../processor/internals/SourceNodeTest.java | 6 +-
.../processor/internals/StreamTaskTest.java | 18 +-
.../kafka/test/InternalMockProcessorContext.java | 6 +-
.../org/apache/kafka/test/MockProcessorNode.java | 12 +-
.../java/org/apache/kafka/test/MockSourceNode.java | 10 +-
40 files changed, 1130 insertions(+), 248 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
new file mode 100644
index 0000000..d3656c7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.time.Duration;
+
+/**
+ * A processor of key-value pair records.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public interface Processor<KIn, VIn, KOut, VOut> {
+
+ /**
+ * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
+ * that contains it is initialized. When the framework is done with the processor, {@link #close()} will be called on it; the
+ * framework may later re-use the processor by calling {@code #init()} again.
+ * <p>
+ * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
+ * {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} a method to be
+ * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
+ *
+ * @param context the context; may not be null
+ */
+ default void init(final ProcessorContext<KOut, VOut> context) {}
+
+ /**
+ * Process the record with the given key and value.
+ *
+ * @param key the key for the record
+ * @param value the value for the record
+ */
+ void process(KIn key, VIn value);
+
+ /**
+ * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup.
+ * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. The framework may
+ * later re-use this processor by calling {@code #init()} on it again.
+ * <p>
+ * Note: Do not close any streams managed resources, like {@link StateStore}s here, as they are managed by the library.
+ */
+ default void close() {}
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
new file mode 100644
index 0000000..4b98e6a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ *
+ * @param <KForward> a bound on the types of keys that may be forwarded
+ * @param <VForward> a bound on the types of values that may be forwarded
+ */
+public interface ProcessorContext<KForward, VForward> {
+
+ /**
+ * Returns the application id.
+ *
+ * @return the application id
+ */
+ String applicationId();
+
+ /**
+ * Returns the task id.
+ *
+ * @return the task id
+ */
+ TaskId taskId();
+
+ /**
+ * Returns the default key serde.
+ *
+ * @return the key serializer
+ */
+ Serde<?> keySerde();
+
+ /**
+ * Returns the default value serde.
+ *
+ * @return the value serializer
+ */
+ Serde<?> valueSerde();
+
+ /**
+ * Returns the state directory for the partition.
+ *
+ * @return the state directory
+ */
+ File stateDir();
+
+ /**
+ * Returns Metrics instance.
+ *
+ * @return StreamsMetrics
+ */
+ StreamsMetrics metrics();
+
+ /**
+ * Registers and possibly restores the specified storage engine.
+ *
+ * @param store the storage engine
+ * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
+ *
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ void register(final StateStore store,
+ final StateRestoreCallback stateRestoreCallback);
+
+ /**
+ * Get the state store given the store name.
+ *
+ * @param name The store name
+ * @return The state store instance
+ */
+ StateStore getStateStore(final String name);
+
+ /**
+ * Schedules a periodic operation for processors. A processor may call this method during
+ * {@link Processor#init(org.apache.kafka.streams.processor.ProcessorContext) initialization} or
+ * {@link Processor#process(Object, Object) processing} to
+ * schedule a periodic callback — called a punctuation — to {@link Punctuator#punctuate(long)}.
+ * The type parameter controls what notion of time is used for punctuation:
+ * <ul>
+ * <li>{@link PunctuationType#STREAM_TIME} — uses "stream time", which is advanced by the processing of messages
+ * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
+ * The first punctuation will be triggered by the first record that is processed.
+ * <b>NOTE:</b> Only advanced if messages arrive</li>
+ * <li>{@link PunctuationType#WALL_CLOCK_TIME} — uses system time (the wall-clock time),
+ * which is advanced independent of whether new messages arrive.
+ * The first punctuation will be triggered after interval has elapsed.
+ * <b>NOTE:</b> This is best effort only as its granularity is limited by how long an iteration of the
+ * processing loop takes to complete</li>
+ * </ul>
+ *
+ * <b>Skipping punctuations:</b> Punctuations will not be triggered more than once at any given timestamp.
+ * This means that "missed" punctuation will be skipped.
+ * It's possible to "miss" a punctuation if:
+ * <ul>
+ * <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more than interval</li>
+ * <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval, ...</li>
+ * </ul>
+ *
+ * @param interval the time interval between punctuations (supported minimum is 1 millisecond)
+ * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
+ * @param callback a function consuming timestamps representing the current stream or system time
+ * @return a handle allowing cancellation of the punctuation schedule established by this method
+ */
+ Cancellable schedule(final Duration interval,
+ final PunctuationType type,
+ final Punctuator callback);
+
+ /**
+ * Forwards a key/value pair to all downstream processors.
+ * Used the input record's timestamp as timestamp for the output record.
+ *
+ * @param key key
+ * @param value value
+ */
+ <K extends KForward, V extends VForward> void forward(final K key, final V value);
+
+ /**
+ * Forwards a key/value pair to the specified downstream processors.
+ * Can be used to set the timestamp of the output record.
+ *
+ * @param key key
+ * @param value value
+ * @param to the options to use when forwarding
+ */
+ <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+
+ /**
+ * Requests a commit.
+ */
+ void commit();
+
+ /**
+ * Returns the topic name of the current input record; could be null if it is not
+ * available (for example, if this method is invoked from the punctuate call).
+ *
+ * @return the topic name
+ */
+ String topic();
+
+ /**
+ * Returns the partition id of the current input record; could be -1 if it is not
+ * available (for example, if this method is invoked from the punctuate call).
+ *
+ * @return the partition id
+ */
+ int partition();
+
+ /**
+ * Returns the offset of the current input record; could be -1 if it is not
+ * available (for example, if this method is invoked from the punctuate call).
+ *
+ * @return the offset
+ */
+ long offset();
+
+ /**
+ * Returns the headers of the current input record; could be null if it is not
+ * available (for example, if this method is invoked from the punctuate call).
+ *
+ * @return the headers
+ */
+ Headers headers();
+
+ /**
+ * Returns the current timestamp.
+ *
+ * <p> If it is triggered while processing a record streamed from the source processor,
+ * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+ * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
+ *
+ * <p> If it is triggered while processing a record generated not from the source processor (for example,
+ * if this method is invoked from the punctuate call), timestamp is defined as the current
+ * task's stream time, which is defined as the largest timestamp of any record processed by the task.
+ *
+ * @return the timestamp
+ */
+ long timestamp();
+
+ /**
+ * Returns all the application config properties as key/value pairs.
+ *
+ * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+ * object and associated to the ProcessorContext.
+ *
+ * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
+ * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
+ * will be of type {@link Class}, even if it was specified as a String to
+ * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
+ *
+ * @return all the key/values from the StreamsConfig properties
+ */
+ Map<String, Object> appConfigs();
+
+ /**
+ * Returns all the application config properties with the given key prefix, as key/value pairs
+ * stripping the prefix.
+ *
+ * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
+ * object and associated to the ProcessorContext.
+ *
+ * @param prefix the properties prefix
+ * @return the key/values matching the given prefix from the StreamsConfig properties.
+ */
+ Map<String, Object> appConfigsWithPrefix(final String prefix);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorSupplier.java
new file mode 100644
index 0000000..25c82c5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+
+/**
+ * A processor supplier that can create one or more {@link Processor} instances.
+ *
+ * It is used in {@link Topology} for adding new processor operators, whose generated
+ * topology can then be replicated (and thus creating one or more {@link Processor} instances)
+ * and distributed to multiple stream threads.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+@FunctionalInterface
+public interface ProcessorSupplier<KIn, VIn, KOut, VOut> extends ConnectedStoreProvider {
+
+ /**
+ * Return a new {@link Processor} instance.
+ *
+ * @return a new {@link Processor} instance
+ */
+ Processor<KIn, VIn, KOut, VOut> get();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 1befc05..6012817 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -42,7 +42,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
private final Serde<?> valueSerde;
private boolean initialized;
protected ProcessorRecordContext recordContext;
- protected ProcessorNode<?, ?> currentNode;
+ protected ProcessorNode<?, ?, ?, ?> currentNode;
private long currentSystemTimeMs;
protected ThreadCache cache;
@@ -196,12 +196,12 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
- public void setCurrentNode(final ProcessorNode<?, ?> currentNode) {
+ public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
this.currentNode = currentNode;
}
@Override
- public ProcessorNode<?, ?> currentNode() {
+ public ProcessorNode<?, ?, ?, ?> currentNode() {
return currentNode;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 57989df..695eb77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -57,12 +57,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final K key, final V value) {
- final ProcessorNode<?, ?> previousNode = currentNode();
+ public <KIn, VIn> void forward(final KIn key, final VIn value) {
+ final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
try {
- for (final ProcessorNode<?, ?> child : currentNode().children()) {
+ for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
setCurrentNode(child);
- ((ProcessorNode<K, V>) child).process(key, value);
+ ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
}
} finally {
setCurrentNode(previousNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index b557330..700f135 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -68,7 +68,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
- final SourceNode<?, ?> source = topology.source(sourceTopic);
+ final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic);
deserializers.put(
sourceTopic,
new RecordDeserializer(
@@ -104,7 +104,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
deserialized.headers());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
- ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+ ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
}
offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
@@ -131,7 +131,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
}
private void initTopology() {
- for (final ProcessorNode<?, ?> node : this.topology.processors()) {
+ for (final ProcessorNode<?, ?, ?, ?> node : this.topology.processors()) {
processorContext.setCurrentNode(node);
try {
node.init(this.processorContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
similarity index 86%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
index 8d2b324..39c3084 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
@@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -33,9 +31,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
* {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
* {@link ThreadCache}
*/
-public interface InternalProcessorContext extends ProcessorContext {
- BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
- ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
+public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {
@Override
StreamsMetricsImpl metrics();
@@ -46,7 +42,7 @@ public interface InternalProcessorContext extends ProcessorContext {
void setSystemTimeMs(long timeMs);
/**
- * @retun the current wall-clock system timestamp in milliseconds
+ * @return the current wall-clock system timestamp in milliseconds
*/
long currentSystemTimeMs();
@@ -64,12 +60,12 @@ public interface InternalProcessorContext extends ProcessorContext {
/**
* @param currentNode the current {@link ProcessorNode}
*/
- void setCurrentNode(ProcessorNode<?, ?> currentNode);
+ void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
/**
* Get the current {@link ProcessorNode}
*/
- ProcessorNode<?, ?> currentNode();
+ ProcessorNode<?, ?, ?, ?> currentNode();
/**
* Get the thread-global cache
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 8d2b324..8e4ec25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -46,7 +46,7 @@ public interface InternalProcessorContext extends ProcessorContext {
void setSystemTimeMs(long timeMs);
/**
- * @retun the current wall-clock system timestamp in milliseconds
+ * @return the current wall-clock system timestamp in milliseconds
*/
long currentSystemTimeMs();
@@ -64,12 +64,12 @@ public interface InternalProcessorContext extends ProcessorContext {
/**
* @param currentNode the current {@link ProcessorNode}
*/
- void setCurrentNode(ProcessorNode<?, ?> currentNode);
+ void setCurrentNode(ProcessorNode<?, ?, ?, ?> currentNode);
/**
* Get the current {@link ProcessorNode}
*/
- ProcessorNode<?, ?> currentNode();
+ ProcessorNode<?, ?, ?, ?> currentNode();
/**
* Get the thread-global cache
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9844341..a915b60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -61,7 +61,7 @@ public class InternalTopologyBuilder {
private static final String[] NO_PREDECESSORS = {};
// node factories in a topological order
- private final Map<String, NodeFactory<?, ?>> nodeFactories = new LinkedHashMap<>();
+ private final Map<String, NodeFactory<?, ?, ?, ?>> nodeFactories = new LinkedHashMap<>();
private final Map<String, StateStoreFactory<?>> stateFactories = new HashMap<>();
@@ -176,7 +176,7 @@ public class InternalTopologyBuilder {
}
}
- private static abstract class NodeFactory<K, V> {
+ private static abstract class NodeFactory<KIn, VIn, KOut, VOut> {
final String name;
final String[] predecessors;
@@ -186,18 +186,18 @@ public class InternalTopologyBuilder {
this.predecessors = predecessors;
}
- public abstract ProcessorNode<K, V> build();
+ public abstract ProcessorNode<KIn, VIn, KOut, VOut> build();
abstract AbstractNode describe();
}
- private static class ProcessorNodeFactory<K, V> extends NodeFactory<K, V> {
- private final ProcessorSupplier<K, V> supplier;
+ private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
+ private final ProcessorSupplier<KIn, VIn> supplier;
private final Set<String> stateStoreNames = new HashSet<>();
ProcessorNodeFactory(final String name,
final String[] predecessors,
- final ProcessorSupplier<K, V> supplier) {
+ final ProcessorSupplier<KIn, VIn> supplier) {
super(name, predecessors.clone());
this.supplier = supplier;
}
@@ -207,7 +207,7 @@ public class InternalTopologyBuilder {
}
@Override
- public ProcessorNode<K, V> build() {
+ public ProcessorNode<KIn, VIn, KOut, VOut> build() {
return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
}
@@ -221,19 +221,19 @@ public class InternalTopologyBuilder {
// even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory
private final Map<String, Pattern> topicToPatterns = new HashMap<>();
- private class SourceNodeFactory<K, V> extends NodeFactory<K, V> {
+ private class SourceNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
private final List<String> topics;
private final Pattern pattern;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valDeserializer;
+ private final Deserializer<KIn> keyDeserializer;
+ private final Deserializer<VIn> valDeserializer;
private final TimestampExtractor timestampExtractor;
private SourceNodeFactory(final String name,
final String[] topics,
final Pattern pattern,
final TimestampExtractor timestampExtractor,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valDeserializer) {
+ final Deserializer<KIn> keyDeserializer,
+ final Deserializer<VIn> valDeserializer) {
super(name, NO_PREDECESSORS);
this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
this.pattern = pattern;
@@ -269,7 +269,7 @@ public class InternalTopologyBuilder {
}
@Override
- public ProcessorNode<K, V> build() {
+ public ProcessorNode<KIn, VIn, KOut, VOut> build() {
return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer);
}
@@ -283,18 +283,18 @@ public class InternalTopologyBuilder {
}
}
- private class SinkNodeFactory<K, V> extends NodeFactory<K, V> {
- private final Serializer<K> keySerializer;
- private final Serializer<V> valSerializer;
- private final StreamPartitioner<? super K, ? super V> partitioner;
- private final TopicNameExtractor<K, V> topicExtractor;
+ private class SinkNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
+ private final Serializer<KIn> keySerializer;
+ private final Serializer<VIn> valSerializer;
+ private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
+ private final TopicNameExtractor<KIn, VIn> topicExtractor;
private SinkNodeFactory(final String name,
final String[] predecessors,
- final TopicNameExtractor<K, V> topicExtractor,
- final Serializer<K> keySerializer,
- final Serializer<V> valSerializer,
- final StreamPartitioner<? super K, ? super V> partitioner) {
+ final TopicNameExtractor<KIn, VIn> topicExtractor,
+ final Serializer<KIn> keySerializer,
+ final Serializer<VIn> valSerializer,
+ final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
super(name, predecessors.clone());
this.topicExtractor = topicExtractor;
this.keySerializer = keySerializer;
@@ -303,9 +303,9 @@ public class InternalTopologyBuilder {
}
@Override
- public ProcessorNode<K, V> build() {
+ public ProcessorNode<KIn, VIn, KOut, VOut> build() {
if (topicExtractor instanceof StaticTopicNameExtractor) {
- final String topic = ((StaticTopicNameExtractor<K, V>) topicExtractor).topicName;
+ final String topic = ((StaticTopicNameExtractor<KIn, VIn>) topicExtractor).topicName;
if (internalTopicNamesWithProperties.containsKey(topic)) {
// prefix the internal topic name with the application id
return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
@@ -318,7 +318,7 @@ public class InternalTopologyBuilder {
}
@Override
- Sink<K, V> describe() {
+ Sink<KIn, VIn> describe() {
return new Sink<>(name, topicExtractor);
}
}
@@ -532,14 +532,14 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
- public final <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier<K, V> stateUpdateSupplier) {
+ public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer<KIn> keyDeserializer,
+ final Deserializer<VIn> valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier<KIn, VIn> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "store builder must not be null");
validateGlobalStoreArguments(sourceName,
topic,
@@ -552,7 +552,7 @@ public class InternalTopologyBuilder {
final String[] topics = {topic};
final String[] predecessors = {sourceName};
- final ProcessorNodeFactory<K, V> nodeFactory = new ProcessorNodeFactory<>(
+ final ProcessorNodeFactory<KIn, VIn, KOut, VOut> nodeFactory = new ProcessorNodeFactory<>(
processorName,
predecessors,
stateUpdateSupplier
@@ -715,9 +715,9 @@ public class InternalTopologyBuilder {
}
stateStoreFactory.users().add(processorName);
- final NodeFactory<?, ?> nodeFactory = nodeFactories.get(processorName);
+ final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
- final ProcessorNodeFactory<?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?>) nodeFactory;
+ final ProcessorNodeFactory<?, ?, ?, ?> processorNodeFactory = (ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory;
processorNodeFactory.addStateStore(stateStoreName);
connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
} else {
@@ -725,21 +725,21 @@ public class InternalTopologyBuilder {
}
}
- private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
- final Set<SourceNodeFactory<?, ?>> sourceNodes = new HashSet<>();
+ private Set<SourceNodeFactory<?, ?, ?, ?>> findSourcesForProcessorPredecessors(final String[] predecessors) {
+ final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodes = new HashSet<>();
for (final String predecessor : predecessors) {
- final NodeFactory<?, ?> nodeFactory = nodeFactories.get(predecessor);
+ final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(predecessor);
if (nodeFactory instanceof SourceNodeFactory) {
- sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
+ sourceNodes.add((SourceNodeFactory<?, ?, ?, ?>) nodeFactory);
} else if (nodeFactory instanceof ProcessorNodeFactory) {
- sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?>) nodeFactory).predecessors));
+ sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory).predecessors));
}
}
return sourceNodes;
}
- private <K, V> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
- final ProcessorNodeFactory<K, V> processorNodeFactory) {
+ private <KIn, VIn, KOut, VOut> void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+ final ProcessorNodeFactory<KIn, VIn, KOut, VOut> processorNodeFactory) {
// we should never update the mapping from state store names to source topics if the store name already exists
// in the map; this scenario is possible, for example, that a state store underlying a source KTable is
// connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
@@ -751,10 +751,10 @@ public class InternalTopologyBuilder {
final Set<String> sourceTopics = new HashSet<>();
final Set<Pattern> sourcePatterns = new HashSet<>();
- final Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor =
+ final Set<SourceNodeFactory<?, ?, ?, ?>> sourceNodesForPredecessor =
findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
- for (final SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
+ for (final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
if (sourceNodeFactory.pattern != null) {
sourcePatterns.add(sourceNodeFactory.pattern);
} else {
@@ -763,13 +763,17 @@ public class InternalTopologyBuilder {
}
if (!sourceTopics.isEmpty()) {
- stateStoreNameToSourceTopics.put(stateStoreName,
- Collections.unmodifiableSet(sourceTopics));
+ stateStoreNameToSourceTopics.put(
+ stateStoreName,
+ Collections.unmodifiableSet(sourceTopics)
+ );
}
if (!sourcePatterns.isEmpty()) {
- stateStoreNameToSourceRegex.put(stateStoreName,
- Collections.unmodifiableSet(sourcePatterns));
+ stateStoreNameToSourceRegex.put(
+ stateStoreName,
+ Collections.unmodifiableSet(sourcePatterns)
+ );
}
}
@@ -880,40 +884,41 @@ public class InternalTopologyBuilder {
return globalGroups;
}
+ @SuppressWarnings("unchecked")
private ProcessorTopology build(final Set<String> nodeGroup) {
Objects.requireNonNull(applicationId, "topology has not completed optimization");
- final Map<String, ProcessorNode<?, ?>> processorMap = new LinkedHashMap<>();
- final Map<String, SourceNode<?, ?>> topicSourceMap = new HashMap<>();
- final Map<String, SinkNode<?, ?>> topicSinkMap = new HashMap<>();
+ final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap = new LinkedHashMap<>();
+ final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap = new HashMap<>();
+ final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap = new HashMap<>();
final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
final Set<String> repartitionTopics = new HashSet<>();
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
// also make sure the state store map values following the insertion ordering
- for (final NodeFactory<?, ?> factory : nodeFactories.values()) {
+ for (final NodeFactory<?, ?, ?, ?> factory : nodeFactories.values()) {
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
- final ProcessorNode<?, ?> node = factory.build();
+ final ProcessorNode<?, ?, ?, ?> node = factory.build();
processorMap.put(node.name(), node);
if (factory instanceof ProcessorNodeFactory) {
buildProcessorNode(processorMap,
stateStoreMap,
- (ProcessorNodeFactory<?, ?>) factory,
- node);
+ (ProcessorNodeFactory<?, ?, ?, ?>) factory,
+ (ProcessorNode<Object, Object, Object, Object>) node);
} else if (factory instanceof SourceNodeFactory) {
buildSourceNode(topicSourceMap,
repartitionTopics,
- (SourceNodeFactory<?, ?>) factory,
- (SourceNode<?, ?>) node);
+ (SourceNodeFactory<?, ?, ?, ?>) factory,
+ (SourceNode<?, ?, ?, ?>) node);
} else if (factory instanceof SinkNodeFactory) {
buildSinkNode(processorMap,
topicSinkMap,
repartitionTopics,
- (SinkNodeFactory<?, ?>) factory,
- (SinkNode<?, ?>) node);
+ (SinkNodeFactory<?, ?, ?, ?>) factory,
+ (SinkNode<Object, Object, ?, ?>) node);
} else {
throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
@@ -929,14 +934,14 @@ public class InternalTopologyBuilder {
repartitionTopics);
}
- private void buildSinkNode(final Map<String, ProcessorNode<?, ?>> processorMap,
- final Map<String, SinkNode<?, ?>> topicSinkMap,
+ private void buildSinkNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
+ final Map<String, SinkNode<?, ?, ?, ?>> topicSinkMap,
final Set<String> repartitionTopics,
- final SinkNodeFactory<?, ?> sinkNodeFactory,
- final SinkNode<?, ?> node) {
+ final SinkNodeFactory<?, ?, ?, ?> sinkNodeFactory,
+ final SinkNode<Object, Object, ?, ?> node) {
- for (final String predecessor : sinkNodeFactory.predecessors) {
- processorMap.get(predecessor).addChild(node);
+ for (final String predecessorName : sinkNodeFactory.predecessors) {
+ getProcessor(processorMap, predecessorName).addChild(node);
if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) {
final String topic = ((StaticTopicNameExtractor<?, ?>) sinkNodeFactory.topicExtractor).topicName;
@@ -953,14 +958,22 @@ public class InternalTopologyBuilder {
}
}
- private void buildSourceNode(final Map<String, SourceNode<?, ?>> topicSourceMap,
+ @SuppressWarnings("unchecked")
+ private static <KIn, VIn, KOut, VOut> ProcessorNode<KIn, VIn, KOut, VOut> getProcessor(
+ final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
+ final String predecessor) {
+
+ return (ProcessorNode<KIn, VIn, KOut, VOut>) processorMap.get(predecessor);
+ }
+
+ private void buildSourceNode(final Map<String, SourceNode<?, ?, ?, ?>> topicSourceMap,
final Set<String> repartitionTopics,
- final SourceNodeFactory<?, ?> sourceNodeFactory,
- final SourceNode<?, ?> node) {
+ final SourceNodeFactory<?, ?, ?, ?> sourceNodeFactory,
+ final SourceNode<?, ?, ?, ?> node) {
final List<String> topics = (sourceNodeFactory.pattern != null) ?
- sourceNodeFactory.getTopics(subscriptionUpdates()) :
- sourceNodeFactory.topics;
+ sourceNodeFactory.getTopics(subscriptionUpdates()) :
+ sourceNodeFactory.topics;
for (final String topic : topics) {
if (internalTopicNamesWithProperties.containsKey(topic)) {
@@ -974,13 +987,13 @@ public class InternalTopologyBuilder {
}
}
- private void buildProcessorNode(final Map<String, ProcessorNode<?, ?>> processorMap,
+ private void buildProcessorNode(final Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
final Map<String, StateStore> stateStoreMap,
- final ProcessorNodeFactory<?, ?> factory,
- final ProcessorNode<?, ?> node) {
+ final ProcessorNodeFactory<?, ?, ?, ?> factory,
+ final ProcessorNode<Object, Object, Object, Object> node) {
for (final String predecessor : factory.predecessors) {
- final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
+ final ProcessorNode<Object, Object, Object, Object> predecessorNode = getProcessor(processorMap, predecessor);
predecessorNode.addChild(node);
}
for (final String stateStoreName : factory.stateStoreNames) {
@@ -1119,7 +1132,7 @@ public class InternalTopologyBuilder {
private void setRegexMatchedTopicsToSourceNodes() {
if (hasSubscriptionUpdates()) {
for (final String nodeName : nodeToSourcePatterns.keySet()) {
- final SourceNodeFactory<?, ?> sourceNode = (SourceNodeFactory<?, ?>) nodeFactories.get(nodeName);
+ final SourceNodeFactory<?, ?, ?, ?> sourceNode = (SourceNodeFactory<?, ?, ?, ?>) nodeFactories.get(nodeName);
final List<String> sourceTopics = sourceNode.getTopics(subscriptionUpdates);
//need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex
nodeToSourceTopics.put(nodeName, sourceTopics);
@@ -1306,10 +1319,10 @@ public class InternalTopologyBuilder {
}
private boolean isGlobalSource(final String nodeName) {
- final NodeFactory<?, ?> nodeFactory = nodeFactories.get(nodeName);
+ final NodeFactory<?, ?, ?, ?> nodeFactory = nodeFactories.get(nodeName);
if (nodeFactory instanceof SourceNodeFactory) {
- final List<String> topics = ((SourceNodeFactory<?, ?>) nodeFactory).topics;
+ final List<String> topics = ((SourceNodeFactory<?, ?, ?, ?>) nodeFactory).topics;
return topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0));
}
return false;
@@ -1348,7 +1361,7 @@ public class InternalTopologyBuilder {
description.addGlobalStore(new GlobalStore(
node,
processorNode,
- ((ProcessorNodeFactory<?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+ ((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
nodeToSourceTopics.get(node).get(0),
id
));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 2bc5891..559e3b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -65,7 +65,7 @@ public class PartitionGroup {
static class RecordInfo {
RecordQueue queue;
- ProcessorNode<?, ?> node() {
+ ProcessorNode<?, ?, ?, ?> node() {
return queue.source();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
new file mode 100644
index 0000000..c68829d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
+ private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
+
+ static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
+ if (delegate == null) {
+ return null;
+ } else {
+ return new ProcessorAdapter<>(delegate);
+ }
+ }
+
+ private ProcessorAdapter(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
+ this.delegate = delegate;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext<KOut, VOut> context) {
+ delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context));
+ }
+
+ @Override
+ public void process(final KIn key, final VIn value) {
+ delegate.process(key, value);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
new file mode 100644
index 0000000..d11aaf3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class ProcessorContextAdapter<KForward, VForward>
+ implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {
+
+ private final InternalProcessorContext delegate;
+
+ @SuppressWarnings("unchecked")
+ static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> shim(final InternalProcessorContext delegate) {
+ if (delegate instanceof ProcessorContextReverseAdapter) {
+ return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
+ } else {
+ return new ProcessorContextAdapter<>(delegate);
+ }
+ }
+
+ private ProcessorContextAdapter(final InternalProcessorContext delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String applicationId() {
+ return delegate.applicationId();
+ }
+
+ @Override
+ public TaskId taskId() {
+ return delegate.taskId();
+ }
+
+ @Override
+ public Serde<?> keySerde() {
+ return delegate.keySerde();
+ }
+
+ @Override
+ public Serde<?> valueSerde() {
+ return delegate.valueSerde();
+ }
+
+ @Override
+ public File stateDir() {
+ return delegate.stateDir();
+ }
+
+ @Override
+ public StreamsMetricsImpl metrics() {
+ return delegate.metrics();
+ }
+
+ @Override
+ public void setSystemTimeMs(final long timeMs) {
+ delegate.setSystemTimeMs(timeMs);
+ }
+
+ @Override
+ public long currentSystemTimeMs() {
+ return delegate.currentSystemTimeMs();
+ }
+
+ @Override
+ public ProcessorRecordContext recordContext() {
+ return delegate.recordContext();
+ }
+
+ @Override
+ public void setRecordContext(final ProcessorRecordContext recordContext) {
+ delegate.setRecordContext(recordContext);
+ }
+
+ @Override
+ public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
+ delegate.setCurrentNode(currentNode);
+ }
+
+ @Override
+ public ProcessorNode<?, ?, ?, ?> currentNode() {
+ return delegate.currentNode();
+ }
+
+ @Override
+ public ThreadCache cache() {
+ return delegate.cache();
+ }
+
+ @Override
+ public void initialize() {
+ delegate.initialize();
+ }
+
+ @Override
+ public void uninitialize() {
+ delegate.uninitialize();
+ }
+
+ @Override
+ public Task.TaskType taskType() {
+ return delegate.taskType();
+ }
+
+ @Override
+ public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
+ delegate.transitionToActive(streamTask, recordCollector, newCache);
+ }
+
+ @Override
+ public void transitionToStandby(final ThreadCache newCache) {
+ delegate.transitionToStandby(newCache);
+ }
+
+ @Override
+ public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
+ delegate.registerCacheFlushListener(namespace, listener);
+ }
+
+ @Override
+ public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
+ return delegate.getStateStore(builder);
+ }
+
+ @Override
+ public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
+ delegate.logChange(storeName, key, value, timestamp);
+ }
+
+ @Override
+ public String changelogFor(final String storeName) {
+ return delegate.changelogFor(storeName);
+ }
+
+ @Override
+ public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+ delegate.register(store, stateRestoreCallback);
+ }
+
+ @Override
+ public StateStore getStateStore(final String name) {
+ return delegate.getStateStore(name);
+ }
+
+ @Override
+ public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
+ return delegate.schedule(interval, type, callback);
+ }
+
+ @Override
+ public <K extends KForward, V extends VForward> void forward(final K key, final V value) {
+ delegate.forward(key, value);
+ }
+
+ @Override
+ public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) {
+ delegate.forward(key, value, to);
+ }
+
+ @Override
+ public void commit() {
+ delegate.commit();
+ }
+
+ @Override
+ public String topic() {
+ return delegate.topic();
+ }
+
+ @Override
+ public int partition() {
+ return delegate.partition();
+ }
+
+ @Override
+ public long offset() {
+ return delegate.offset();
+ }
+
+ @Override
+ public Headers headers() {
+ return delegate.headers();
+ }
+
+ @Override
+ public long timestamp() {
+ return delegate.timestamp();
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ return delegate.appConfigs();
+ }
+
+ @Override
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+ return delegate.appConfigsWithPrefix(prefix);
+ }
+
+ InternalProcessorContext delegate() {
+ return delegate;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 6baef83..d743d72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -196,7 +196,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final V value,
final To to) {
throwUnsupportedOperationExceptionIfStandby("forward");
- final ProcessorNode<?, ?> previousNode = currentNode();
+ final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
final ProcessorRecordContext previousContext = recordContext;
try {
@@ -212,12 +212,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final String sendTo = toInternal.child();
if (sendTo == null) {
- final List<ProcessorNode<?, ?>> children = currentNode().children();
- for (final ProcessorNode<?, ?> child : children) {
- forward((ProcessorNode<K, V>) child, key, value);
+ final List<? extends ProcessorNode<?, ?, ?, ?>> children = currentNode().children();
+ for (final ProcessorNode<?, ?, ?, ?> child : children) {
+ forward((ProcessorNode<K, V, ?, ?>) child, key, value);
}
} else {
- final ProcessorNode<K, V> child = currentNode().getChild(sendTo);
+ final ProcessorNode<K, V, ?, ?> child = currentNode().getChild(sendTo);
if (child == null) {
throw new StreamsException("Unknown downstream node: " + sendTo
+ " either does not exist or is not connected to this processor.");
@@ -230,7 +230,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
- private <K, V> void forward(final ProcessorNode<K, V> child,
+ private <K, V> void forward(final ProcessorNode<K, V, ?, ?> child,
final K key,
final V value) {
setCurrentNode(child);
@@ -293,7 +293,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
- public ProcessorNode<?, ?> currentNode() {
+ public ProcessorNode<?, ?, ?, ?> currentNode() {
throwUnsupportedOperationExceptionIfStandby("currentNode");
return super.currentNode();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
new file mode 100644
index 0000000..6e82a5e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class ProcessorContextReverseAdapter implements InternalProcessorContext {
+ private final InternalApiProcessorContext<Object, Object> delegate;
+
+ static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) {
+ if (delegate instanceof ProcessorContextAdapter) {
+ return ((ProcessorContextAdapter<Object, Object>) delegate).delegate();
+ } else {
+ return new ProcessorContextReverseAdapter(delegate);
+ }
+ }
+
+ private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String applicationId() {
+ return delegate.applicationId();
+ }
+
+ @Override
+ public TaskId taskId() {
+ return delegate.taskId();
+ }
+
+ @Override
+ public Serde<?> keySerde() {
+ return delegate.keySerde();
+ }
+
+ @Override
+ public Serde<?> valueSerde() {
+ return delegate.valueSerde();
+ }
+
+ @Override
+ public File stateDir() {
+ return delegate.stateDir();
+ }
+
+ @Override
+ public StreamsMetricsImpl metrics() {
+ return delegate.metrics();
+ }
+
+ @Override
+ public void setSystemTimeMs(final long timeMs) {
+ delegate.setSystemTimeMs(timeMs);
+ }
+
+ @Override
+ public long currentSystemTimeMs() {
+ return delegate.currentSystemTimeMs();
+ }
+
+ @Override
+ public ProcessorRecordContext recordContext() {
+ return delegate.recordContext();
+ }
+
+ @Override
+ public void setRecordContext(final ProcessorRecordContext recordContext) {
+ delegate.setRecordContext(recordContext);
+ }
+
+ @Override
+ public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
+ delegate.setCurrentNode(currentNode);
+ }
+
+ @Override
+ public ProcessorNode<?, ?, ?, ?> currentNode() {
+ return delegate.currentNode();
+ }
+
+ @Override
+ public ThreadCache cache() {
+ return delegate.cache();
+ }
+
+ @Override
+ public void initialize() {
+ delegate.initialize();
+ }
+
+ @Override
+ public void uninitialize() {
+ delegate.uninitialize();
+ }
+
+ @Override
+ public Task.TaskType taskType() {
+ return delegate.taskType();
+ }
+
+ @Override
+ public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
+ delegate.transitionToActive(streamTask, recordCollector, newCache);
+ }
+
+ @Override
+ public void transitionToStandby(final ThreadCache newCache) {
+ delegate.transitionToStandby(newCache);
+ }
+
+ @Override
+ public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) {
+ delegate.registerCacheFlushListener(namespace, listener);
+ }
+
+ @Override
+ public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
+ return delegate.getStateStore(builder);
+ }
+
+ @Override
+ public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) {
+ delegate.logChange(storeName, key, value, timestamp);
+ }
+
+ @Override
+ public String changelogFor(final String storeName) {
+ return delegate.changelogFor(storeName);
+ }
+
+ @Override
+ public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
+ delegate.register(store, stateRestoreCallback);
+ }
+
+ @Override
+ public StateStore getStateStore(final String name) {
+ return delegate.getStateStore(name);
+ }
+
+ @Deprecated
+ @Override
+ public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
+ return delegate.schedule(Duration.ofMillis(intervalMs), type, callback);
+ }
+
+ @Override
+ public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
+ return delegate.schedule(interval, type, callback);
+ }
+
+ @Override
+ public <K, V> void forward(final K key, final V value) {
+ delegate.forward(key, value);
+ }
+
+ @Override
+ public <K, V> void forward(final K key, final V value, final To to) {
+ delegate.forward(key, value, to);
+ }
+
+ @Deprecated
+ @Override
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
+ delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name()));
+ }
+
+ @Deprecated
+ @Override
+ public <K, V> void forward(final K key, final V value, final String childName) {
+ delegate.forward(key, value, To.child(childName));
+ }
+
+ @Override
+ public void commit() {
+ delegate.commit();
+ }
+
+ @Override
+ public String topic() {
+ return delegate.topic();
+ }
+
+ @Override
+ public int partition() {
+ return delegate.partition();
+ }
+
+ @Override
+ public long offset() {
+ return delegate.offset();
+ }
+
+ @Override
+ public Headers headers() {
+ return delegate.headers();
+ }
+
+ @Override
+ public long timestamp() {
+ return delegate.timestamp();
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ return delegate.appConfigs();
+ }
+
+ @Override
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+ return delegate.appConfigsWithPrefix(prefix);
+ }
+
+ InternalApiProcessorContext<Object, Object> delegate() {
+ return delegate;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index a91eb6f..b489e62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -33,13 +33,13 @@ import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-public class ProcessorNode<K, V> {
+public class ProcessorNode<KIn, VIn, KOut, VOut> {
// TODO: 'children' can be removed when #forward() via index is removed
- private final List<ProcessorNode<?, ?>> children;
- private final Map<String, ProcessorNode<?, ?>> childByName;
+ private final List<ProcessorNode<KOut, VOut, ?, ?>> children;
+ private final Map<String, ProcessorNode<KOut, VOut, ?, ?>> childByName;
- private final Processor<K, V> processor;
+ private final Processor<KIn, VIn, KOut, VOut> processor;
private final String name;
private final Time time;
@@ -57,9 +57,12 @@ public class ProcessorNode<K, V> {
this(name, null, null);
}
- public ProcessorNode(final String name, final Processor<K, V> processor, final Set<String> stateStores) {
+ public ProcessorNode(final String name,
+ final org.apache.kafka.streams.processor.Processor<KIn, VIn> processor,
+ final Set<String> stateStores) {
+
this.name = name;
- this.processor = processor;
+ this.processor = ProcessorAdapter.adapt(processor);
this.children = new ArrayList<>();
this.childByName = new HashMap<>();
this.stateStores = stateStores;
@@ -70,11 +73,11 @@ public class ProcessorNode<K, V> {
return name;
}
- public final Processor<K, V> processor() {
+ public final Processor<KIn, VIn, KOut, VOut> processor() {
return processor;
}
- public List<ProcessorNode<?, ?>> children() {
+ public List<ProcessorNode<KOut, VOut, ?, ?>> children() {
return children;
}
@@ -82,7 +85,7 @@ public class ProcessorNode<K, V> {
return childByName.get(childName);
}
- public void addChild(final ProcessorNode<?, ?> child) {
+ public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) {
children.add(child);
childByName.put(child.name, child);
}
@@ -94,7 +97,7 @@ public class ProcessorNode<K, V> {
maybeMeasureLatency(
() -> {
if (processor != null) {
- processor.init(context);
+ processor.init(ProcessorContextAdapter.shim(context));
}
},
time,
@@ -137,7 +140,7 @@ public class ProcessorNode<K, V> {
}
- public void process(final K key, final V value) {
+ public void process(final KIn key, final VIn value) {
try {
maybeMeasureLatency(() -> processor.process(key, value), time, processSensor);
} catch (final ClassCastException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
index 405b1b6..5544adc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
@@ -21,6 +21,6 @@ import org.apache.kafka.streams.processor.Punctuator;
public interface ProcessorNodePunctuator {
- void punctuate(ProcessorNode<?, ?> node, long timestamp, PunctuationType type, Punctuator punctuator);
+ void punctuate(ProcessorNode<?, ?, ?, ?> node, long timestamp, PunctuationType type, Punctuator punctuator);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 64d2adc..75f3688 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -31,10 +31,10 @@ import org.slf4j.LoggerFactory;
public class ProcessorTopology {
private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class);
- private final List<ProcessorNode<?, ?>> processorNodes;
- private final Map<String, SourceNode<?, ?>> sourceNodesByName;
- private final Map<String, SourceNode<?, ?>> sourceNodesByTopic;
- private final Map<String, SinkNode<?, ?>> sinksByTopic;
+ private final List<ProcessorNode<?, ?, ?, ?>> processorNodes;
+ private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByName;
+ private final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic;
+ private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic;
private final Set<String> terminalNodes;
private final List<StateStore> stateStores;
private final Set<String> repartitionTopics;
@@ -43,9 +43,9 @@ public class ProcessorTopology {
private final List<StateStore> globalStateStores;
private final Map<String, String> storeToChangelogTopic;
- public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?>> sourceNodesByTopic,
- final Map<String, SinkNode<?, ?>> sinksByTopic,
+ public ProcessorTopology(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+ final Map<String, SourceNode<?, ?, ?, ?>> sourceNodesByTopic,
+ final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic,
final List<StateStore> stateStores,
final List<StateStore> globalStateStores,
final Map<String, String> storeToChangelogTopic,
@@ -59,14 +59,14 @@ public class ProcessorTopology {
this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
this.terminalNodes = new HashSet<>();
- for (final ProcessorNode<?, ?> node : processorNodes) {
+ for (final ProcessorNode<?, ?, ?, ?> node : processorNodes) {
if (node.isTerminalNode()) {
terminalNodes.add(node.name());
}
}
this.sourceNodesByName = new HashMap<>();
- for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) {
+ for (final SourceNode<?, ?, ?, ?> source : sourceNodesByTopic.values()) {
sourceNodesByName.put(source.name(), source);
}
}
@@ -75,11 +75,11 @@ public class ProcessorTopology {
return sourceNodesByTopic.keySet();
}
- public SourceNode<?, ?> source(final String topic) {
+ public SourceNode<?, ?, ?, ?> source(final String topic) {
return sourceNodesByTopic.get(topic);
}
- public Set<SourceNode<?, ?>> sources() {
+ public Set<SourceNode<?, ?, ?, ?>> sources() {
return new HashSet<>(sourceNodesByTopic.values());
}
@@ -87,7 +87,7 @@ public class ProcessorTopology {
return sinksByTopic.keySet();
}
- public SinkNode<?, ?> sink(final String topic) {
+ public SinkNode<?, ?, ?, ?> sink(final String topic) {
return sinksByTopic.get(topic);
}
@@ -95,7 +95,7 @@ public class ProcessorTopology {
return terminalNodes;
}
- public List<ProcessorNode<?, ?>> processors() {
+ public List<ProcessorNode<?, ?, ?, ?>> processors() {
return processorNodes;
}
@@ -163,13 +163,13 @@ public class ProcessorTopology {
}
}
- private String childrenToString(final String indent, final List<ProcessorNode<?, ?>> children) {
+ private String childrenToString(final String indent, final List<? extends ProcessorNode<?, ?, ?, ?>> children) {
if (children == null || children.isEmpty()) {
return "";
}
final StringBuilder sb = new StringBuilder(indent + "\tchildren:\t[");
- for (final ProcessorNode<?, ?> child : children) {
+ for (final ProcessorNode<?, ?, ?, ?> child : children) {
sb.append(child.name());
sb.append(", ");
}
@@ -177,7 +177,7 @@ public class ProcessorTopology {
sb.append("]\n");
// recursively print children
- for (final ProcessorNode<?, ?> child : children) {
+ for (final ProcessorNode<?, ?, ?, ?> child : children) {
sb.append(child.toString(indent)).append(childrenToString(indent, child.children()));
}
return sb.toString();
@@ -199,10 +199,10 @@ public class ProcessorTopology {
* @return A string representation of this instance.
*/
public String toString(final String indent) {
- final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>();
- for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) {
+ final Map<SourceNode<?, ?, ?, ?>, List<String>> sourceToTopics = new HashMap<>();
+ for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) {
final String topic = sourceNodeEntry.getKey();
- final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+ final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getValue();
sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
sourceToTopics.get(source).add(topic);
}
@@ -210,8 +210,8 @@ public class ProcessorTopology {
final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
// start from sources
- for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
- final SourceNode<?, ?> source = sourceNodeEntry.getKey();
+ for (final Map.Entry<SourceNode<?, ?, ?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) {
+ final SourceNode<?, ?, ?, ?> source = sourceNodeEntry.getKey();
final List<String> topics = sourceNodeEntry.getValue();
sb.append(source.toString(indent + "\t"))
.append(topicsToString(indent + "\t", topics))
@@ -234,7 +234,7 @@ public class ProcessorTopology {
// for testing only
public Set<String> processorConnectedStateStores(final String processorName) {
- for (final ProcessorNode<?, ?> node : processorNodes) {
+ for (final ProcessorNode<?, ?, ?, ?> node : processorNodes) {
if (node.name().equals(processorName)) {
return node.stateStores;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 86b1b44..269dff2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -29,11 +29,11 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXC
class RecordDeserializer {
private final Logger log;
- private final SourceNode<?, ?> sourceNode;
+ private final SourceNode<?, ?, ?, ?> sourceNode;
private final Sensor droppedRecordsSensor;
private final DeserializationExceptionHandler deserializationExceptionHandler;
- RecordDeserializer(final SourceNode<?, ?> sourceNode,
+ RecordDeserializer(final SourceNode<?, ?, ?, ?> sourceNode,
final DeserializationExceptionHandler deserializationExceptionHandler,
final LogContext logContext,
final Sensor droppedRecordsSensor) {
@@ -98,7 +98,7 @@ class RecordDeserializer {
}
}
- SourceNode<?, ?> sourceNode() {
+ SourceNode<?, ?, ?, ?> sourceNode() {
return sourceNode;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 6f0db8a..df7e834 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -39,7 +39,7 @@ public class RecordQueue {
public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
private final Logger log;
- private final SourceNode<?, ?> source;
+ private final SourceNode<?, ?, ?, ?> source;
private final TopicPartition partition;
private final ProcessorContext processorContext;
private final TimestampExtractor timestampExtractor;
@@ -52,7 +52,7 @@ public class RecordQueue {
private final Sensor droppedRecordsSensor;
RecordQueue(final TopicPartition partition,
- final SourceNode<?, ?> source,
+ final SourceNode<?, ?, ?, ?> source,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
final InternalProcessorContext processorContext,
@@ -85,7 +85,7 @@ public class RecordQueue {
*
* @return SourceNode
*/
- public SourceNode<?, ?> source() {
+ public SourceNode<?, ?, ?, ?> source() {
return source;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 9b0a254..a77deb8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -22,20 +22,20 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
-public class SinkNode<K, V> extends ProcessorNode<K, V> {
+public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
- private Serializer<K> keySerializer;
- private Serializer<V> valSerializer;
- private final TopicNameExtractor<K, V> topicExtractor;
- private final StreamPartitioner<? super K, ? super V> partitioner;
+ private Serializer<KIn> keySerializer;
+ private Serializer<VIn> valSerializer;
+ private final TopicNameExtractor<KIn, VIn> topicExtractor;
+ private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
private InternalProcessorContext context;
SinkNode(final String name,
- final TopicNameExtractor<K, V> topicExtractor,
- final Serializer<K> keySerializer,
- final Serializer<V> valSerializer,
- final StreamPartitioner<? super K, ? super V> partitioner) {
+ final TopicNameExtractor<KIn, VIn> topicExtractor,
+ final Serializer<KIn> keySerializer,
+ final Serializer<VIn> valSerializer,
+ final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
super(name);
this.topicExtractor = topicExtractor;
@@ -48,7 +48,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
* @throws UnsupportedOperationException if this method adds a child to a sink node
*/
@Override
- public void addChild(final ProcessorNode<?, ?> child) {
+ public void addChild(final ProcessorNode<KOut, VOut, ?, ?> child) {
throw new UnsupportedOperationException("sink node does not allow addChild");
}
@@ -60,10 +60,10 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
// if serializers are null, get the default ones from the context
if (keySerializer == null) {
- keySerializer = (Serializer<K>) context.keySerde().serializer();
+ keySerializer = (Serializer<KIn>) context.keySerde().serializer();
}
if (valSerializer == null) {
- valSerializer = (Serializer<V>) context.valueSerde().serializer();
+ valSerializer = (Serializer<VIn>) context.valueSerde().serializer();
}
// if serializers are internal wrapping serializers that may need to be given the default serializer
@@ -78,7 +78,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
@Override
- public void process(final K key, final V value) {
+ public void process(final KIn key, final VIn value) {
final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
final long timestamp = context.timestamp();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 8508a7d..6dbdfee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -23,18 +23,18 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
-public class SourceNode<K, V> extends ProcessorNode<K, V> {
+public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
private InternalProcessorContext context;
- private Deserializer<K> keyDeserializer;
- private Deserializer<V> valDeserializer;
+ private Deserializer<KIn> keyDeserializer;
+ private Deserializer<VIn> valDeserializer;
private final TimestampExtractor timestampExtractor;
private Sensor processAtSourceSensor;
public SourceNode(final String name,
final TimestampExtractor timestampExtractor,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valDeserializer) {
+ final Deserializer<KIn> keyDeserializer,
+ final Deserializer<VIn> valDeserializer) {
super(name);
this.timestampExtractor = timestampExtractor;
this.keyDeserializer = keyDeserializer;
@@ -42,16 +42,16 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
}
public SourceNode(final String name,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valDeserializer) {
+ final Deserializer<KIn> keyDeserializer,
+ final Deserializer<VIn> valDeserializer) {
this(name, null, keyDeserializer, valDeserializer);
}
- K deserializeKey(final String topic, final Headers headers, final byte[] data) {
+ KIn deserializeKey(final String topic, final Headers headers, final byte[] data) {
return keyDeserializer.deserialize(topic, headers, data);
}
- V deserializeValue(final String topic, final Headers headers, final byte[] data) {
+ VIn deserializeValue(final String topic, final Headers headers, final byte[] data) {
return valDeserializer.deserialize(topic, headers, data);
}
@@ -74,10 +74,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
// if deserializers are null, get the default ones from the context
if (this.keyDeserializer == null) {
- this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
+ this.keyDeserializer = (Deserializer<KIn>) context.keySerde().deserializer();
}
if (this.valDeserializer == null) {
- this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+ this.valDeserializer = (Deserializer<VIn>) context.valueSerde().deserializer();
}
// if deserializers are internal wrapping deserializers that may need to be given the default
@@ -92,7 +92,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
@Override
- public void process(final K key, final V value) {
+ public void process(final KIn key, final VIn value) {
context.forward(key, value);
processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index abdd567..a046c0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -158,7 +158,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
);
}
- for (final ProcessorNode<?, ?> sourceNode : topology.sources()) {
+ for (final ProcessorNode<?, ?, ?, ?> sourceNode : topology.sources()) {
final String sourceNodeName = sourceNode.name();
e2eLatencySensors.put(
sourceNodeName,
@@ -296,7 +296,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// close the processors
// make sure close() is called for each node even when there is a RuntimeException
RuntimeException exception = null;
- for (final ProcessorNode<?, ?> node : topology.processors()) {
+ for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
processorContext.setCurrentNode(node);
try {
node.close();
@@ -669,7 +669,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
try {
// process the record by passing to the source node of the topology
- final ProcessorNode<Object, Object> currNode = (ProcessorNode<Object, Object>) recordInfo.node();
+ final ProcessorNode<Object, Object, Object, Object> currNode = (ProcessorNode<Object, Object, Object, Object>) recordInfo.node();
final TopicPartition partition = recordInfo.partition();
log.trace("Start processing one record [{}]", record);
@@ -738,7 +738,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
@Override
- public void punctuate(final ProcessorNode<?, ?> node,
+ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
final long timestamp,
final PunctuationType type,
final Punctuator punctuator) {
@@ -763,7 +763,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
- private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?> currNode, final long wallClockTime) {
+ private void updateProcessorContext(final StampedRecord record, final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime) {
processorContext.setRecordContext(
new ProcessorRecordContext(
record.timestamp,
@@ -843,7 +843,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private void initializeTopology() {
// initialize the task by initializing all its processor nodes in the topology
log.trace("Initializing processor nodes of the topology");
- for (final ProcessorNode<?, ?> node : topology.processors()) {
+ for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
processorContext.setCurrentNode(node);
try {
node.init(processorContext);
@@ -1108,7 +1108,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
public RecordQueue createQueue(final TopicPartition partition) {
- final SourceNode<?, ?> source = topology.source(partition.topic());
+ final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic());
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
final TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : defaultTimestampExtractor;
return new RecordQueue(
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index bdc60a7..ecf23ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -851,7 +851,7 @@ public class StreamsBuilderTest {
}
private static void assertNamesForOperation(final ProcessorTopology topology, final String... expected) {
- final List<ProcessorNode<?, ?>> processors = topology.processors();
+ final List<ProcessorNode<?, ?, ?, ?>> processors = topology.processors();
assertEquals("Invalid number of expected processors", expected.length, processors.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processors.get(i).name());
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
index 69c3f38..e91b007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -54,13 +54,13 @@ public class TopologyTestDriverWrapper extends TopologyTestDriver {
* @param name the name to search for
* @return the processor matching the search name
*/
- public ProcessorNode<?, ?> getProcessor(final String name) {
- for (final ProcessorNode<?, ?> node : processorTopology.processors()) {
+ public ProcessorNode<?, ?, ?, ?> getProcessor(final String name) {
+ for (final ProcessorNode<?, ?, ?, ?> node : processorTopology.processors()) {
if (node.name().equals(name)) {
return node;
}
}
- for (final ProcessorNode<?, ?> node : globalTopology.processors()) {
+ for (final ProcessorNode<?, ?, ?, ?> node : globalTopology.processors()) {
if (node.name().equals(name)) {
return node;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 17047a1..4b3df44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -1580,9 +1580,9 @@ public class KStreamImplTest {
final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
- final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
+ final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
- for (final SourceNode<?, ?> sourceNode : topology.sources()) {
+ for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
if (sourceNode.name().equals(originalSourceNode.name())) {
assertNull(sourceNode.getTimestampExtractor());
} else {
@@ -1609,9 +1609,9 @@ public class KStreamImplTest {
final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
- final SourceNode<?, ?> originalSourceNode = topology.source("topic-1");
+ final SourceNode<?, ?, ?, ?> originalSourceNode = topology.source("topic-1");
- for (final SourceNode<?, ?> sourceNode : topology.sources()) {
+ for (final SourceNode<?, ?, ?, ?> sourceNode : topology.sources()) {
if (sourceNode.name().equals(originalSourceNode.name())) {
assertNull(sourceNode.getTimestampExtractor());
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 7119d5b..ad8cd0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -30,9 +30,6 @@ import org.hamcrest.core.IsInstanceOf;
import org.junit.Before;
import org.junit.Test;
-import java.util.Collections;
-
-import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
@@ -54,7 +51,7 @@ public class GlobalProcessorContextImplTest {
private GlobalProcessorContextImpl globalContext;
- private ProcessorNode<?, ?> child;
+ private ProcessorNode<Object, Object, Object, Object> child;
private ProcessorRecordContext recordContext;
@Before
@@ -82,20 +79,12 @@ public class GlobalProcessorContextImplTest {
null,
null);
- final ProcessorNode<?, ?> processorNode = mock(ProcessorNode.class);
- globalContext.setCurrentNode(processorNode);
+ final ProcessorNode<Object, Object, Object, Object> processorNode = new ProcessorNode<>("testNode");
child = mock(ProcessorNode.class);
+ processorNode.addChild(child);
- expect(processorNode.children())
- .andReturn(Collections.singletonList(child))
- .anyTimes();
- expect(processorNode.getChild(CHILD_PROCESSOR))
- .andReturn(child);
- expect(processorNode.getChild(anyString()))
- .andReturn(null);
- replay(processorNode);
-
+ globalContext.setCurrentNode(processorNode);
recordContext = mock(ProcessorRecordContext.class);
globalContext.setRecordContext(recordContext);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 42d3fc9..6ce2bae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -58,14 +58,14 @@ public class GlobalStateTaskTest {
private final String topic2 = "t2";
private final TopicPartition t1 = new TopicPartition(topic1, 1);
private final TopicPartition t2 = new TopicPartition(topic2, 1);
- private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>(
+ private final MockSourceNode<String, String, ?, ?> sourceOne = new MockSourceNode<>(
new StringDeserializer(),
new StringDeserializer());
- private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode<>(
+ private final MockSourceNode<Integer, Integer, ?, ?> sourceTwo = new MockSourceNode<>(
new IntegerDeserializer(),
new IntegerDeserializer());
- private final MockProcessorNode<?, ?> processorOne = new MockProcessorNode<>();
- private final MockProcessorNode<?, ?> processorTwo = new MockProcessorNode<>();
+ private final MockProcessorNode<?, ?, ?, ?> processorOne = new MockProcessorNode<>();
+ private final MockProcessorNode<?, ?, ?, ?> processorTwo = new MockProcessorNode<>();
private final Map<TopicPartition, Long> offsets = new HashMap<>();
private File testDirectory = TestUtils.tempDirectory("global-store");
@@ -78,7 +78,7 @@ public class GlobalStateTaskTest {
@Before
public void before() {
final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
- final Map<String, SourceNode<?, ?>> sourceByTopics = new HashMap<>();
+ final Map<String, SourceNode<?, ?, ?, ?>> sourceByTopics = new HashMap<>();
sourceByTopics.put(topic1, sourceOne);
sourceByTopics.put(topic2, sourceTwo);
final Map<String, String> storeToTopic = new HashMap<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9bcd47e..5e3c2dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -733,9 +733,9 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(null);
}
- private Set<String> nodeNames(final Collection<ProcessorNode<?, ?>> nodes) {
+ private Set<String> nodeNames(final Collection<ProcessorNode<?, ?, ?, ?>> nodes) {
final Set<String> nodeNames = new HashSet<>();
- for (final ProcessorNode<?, ?> node : nodes) {
+ for (final ProcessorNode<?, ?, ?, ?> node : nodes) {
nodeNames.add(node.name());
}
return nodeNames;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index d3b05bc..32e498c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -150,7 +150,7 @@ public class ProcessorContextImplTest {
((InternalProcessorContext) context).transitionToActive(task, null, null);
EasyMock.expect(task.recordCollector()).andStubReturn(recordCollector);
- context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
+ context.setCurrentNode(new ProcessorNode<String, Long, Object, Object>("fake", null,
new HashSet<>(asList(
"LocalKeyValueStore",
"LocalTimestampedKeyValueStore",
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 91fa83e..08125e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -109,7 +109,7 @@ public class ProcessorNodeTest {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
- final ProcessorNode<Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+ final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
node.init(context);
final String threadId = Thread.currentThread().getName();
@@ -198,7 +198,7 @@ public class ProcessorNodeTest {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST);
final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
- final ProcessorNode<Object, Object> node = new ProcessorNode<Object, Object>("name", new ClassCastProcessor(), Collections.<String>emptySet());
+ final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
node.init(context);
final StreamsException se = assertThrows(StreamsException.class, () -> node.process("aKey", "aValue"));
assertThat(se.getCause(), instanceOf(ClassCastException.class));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
index 25ea3ef..b4cef8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
@@ -26,8 +26,8 @@ public final class ProcessorTopologyFactories {
private ProcessorTopologyFactories() {}
- public static ProcessorTopology with(final List<ProcessorNode<?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?>> sourcesByTopic,
+ public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+ final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
final List<StateStore> stateStoresByName,
final Map<String, String> storeToChangelogTopic) {
return new ProcessorTopology(processorNodes,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 964f352..55043e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertEquals;
public class PunctuationQueueTest {
- private final MockProcessorNode<String, String> node = new MockProcessorNode<>();
+ private final MockProcessorNode<String, String, ?, ?> node = new MockProcessorNode<>();
private final PunctuationQueue queue = new PunctuationQueue();
private final Punctuator punctuator = new Punctuator() {
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 7299067..4ae9d32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -68,7 +68,7 @@ public class RecordDeserializerTest {
assertEquals(rawRecord.headers(), record.headers());
}
- static class TheSourceNode extends SourceNode<Object, Object> {
+ static class TheSourceNode extends SourceNode<Object, Object, Object, Object> {
private final boolean keyThrowsException;
private final boolean valueThrowsException;
private final Object key;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 6929bb8e..895e045 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -64,7 +64,7 @@ public class RecordQueueTest {
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new MockRecordCollector()
);
- private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
+ private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics
= new MockSourceNode<>(intDeserializer, intDeserializer);
private final RecordQueue queue = new RecordQueue(
new TopicPartition("topic", 1),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 0c303d2..6234b0f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -33,12 +33,12 @@ public class SinkNodeTest {
private final Serializer<byte[]> anySerializer = Serdes.ByteArray().serializer();
private final RecordCollector recordCollector = new MockRecordCollector();
private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, recordCollector);
- private final SinkNode<byte[], byte[]> sink = new SinkNode<>("anyNodeName",
+ private final SinkNode<byte[], byte[], ?, ?> sink = new SinkNode<>("anyNodeName",
new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null);
// Used to verify that the correct exceptions are thrown if the compiler checks are bypassed
@SuppressWarnings("unchecked")
- private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink;
+ private final SinkNode<Object, Object, ?, ?> illTypedSink = (SinkNode) sink;
@Before
public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 028c4fe..f565895 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
public class SourceNodeTest {
@Test
public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
- final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+ final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
final RecordHeaders headers = new RecordHeaders();
final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -51,7 +51,7 @@ public class SourceNodeTest {
@Test
public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
- final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
+ final SourceNode<String, String, ?, ?> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer());
final RecordHeaders headers = new RecordHeaders();
final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -83,7 +83,7 @@ public class SourceNodeTest {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics);
- final SourceNode<String, String> node =
+ final SourceNode<String, String, ?, ?> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
node.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index facbf2e..ea20955 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -127,9 +127,9 @@ public class StreamTaskTest {
private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
- private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
+ private final MockSourceNode<Integer, Integer, Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) {
@Override
public void process(final Integer key, final Integer value) {
throw new RuntimeException("KABOOM!");
@@ -140,8 +140,8 @@ public class StreamTaskTest {
throw new RuntimeException("KABOOM!");
}
};
- private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
- private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
+ private final MockProcessorNode<Integer, Integer, ?, ?> processorStreamTime = new MockProcessorNode<>(10L);
+ private final MockProcessorNode<Integer, Integer, ?, ?> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
private final String storeName = "store";
private final MockKeyValueStore stateStore = new MockKeyValueStore(storeName, false);
@@ -175,8 +175,8 @@ public class StreamTaskTest {
}
};
- private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?>> sourcesByTopic,
+ private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+ final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic,
final Set<String> repartitionTopics) {
return new ProcessorTopology(processorNodes,
sourcesByTopic,
@@ -187,8 +187,8 @@ public class StreamTaskTest {
repartitionTopics);
}
- private static ProcessorTopology withSources(final List<ProcessorNode<?, ?>> processorNodes,
- final Map<String, SourceNode<?, ?>> sourcesByTopic) {
+ private static ProcessorTopology withSources(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
+ final Map<String, SourceNode<?, ?, ?, ?>> sourcesByTopic) {
return new ProcessorTopology(processorNodes,
sourcesByTopic,
Collections.emptyMap(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 4c782a4..2552fb4 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -298,12 +298,12 @@ public class InternalMockProcessorContext
if (toInternal.hasTimestamp()) {
setTime(toInternal.timestamp());
}
- final ProcessorNode<?, ?> thisNode = currentNode;
+ final ProcessorNode<?, ?, ?, ?> thisNode = currentNode;
try {
- for (final ProcessorNode<?, ?> childNode : thisNode.children()) {
+ for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) {
if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
currentNode = childNode;
- ((ProcessorNode<Object, Object>) childNode).process(key, value);
+ ((ProcessorNode<Object, Object, ?, ?>) childNode).process(key, value);
toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple
// Processors and toInternal might have been modified
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 718e5af..9a8b407 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -23,12 +23,12 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
-public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
+public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> {
private static final String NAME = "MOCK-PROCESS-";
private static final AtomicInteger INDEX = new AtomicInteger(1);
- public final MockProcessor<K, V> mockProcessor;
+ public final MockProcessor<KIn, VIn> mockProcessor;
public boolean closed;
public boolean initialized;
@@ -38,14 +38,14 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
}
public MockProcessorNode(final long scheduleInterval, final PunctuationType punctuationType) {
- this(new MockProcessor<K, V>(punctuationType, scheduleInterval));
+ this(new MockProcessor<>(punctuationType, scheduleInterval));
}
public MockProcessorNode() {
- this(new MockProcessor<K, V>());
+ this(new MockProcessor<>());
}
- private MockProcessorNode(final MockProcessor<K, V> mockProcessor) {
+ private MockProcessorNode(final MockProcessor<KIn, VIn> mockProcessor) {
super(NAME + INDEX.getAndIncrement(), mockProcessor, Collections.<String>emptySet());
this.mockProcessor = mockProcessor;
@@ -58,7 +58,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
}
@Override
- public void process(final K key, final V value) {
+ public void process(final KIn key, final VIn value) {
processor().process(key, value);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index f582202..5130d46 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-public class MockSourceNode<K, V> extends SourceNode<K, V> {
+public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> {
private static final String NAME = "MOCK-SOURCE-";
private static final AtomicInteger INDEX = new AtomicInteger(1);
public int numReceived = 0;
- public final ArrayList<K> keys = new ArrayList<>();
- public final ArrayList<V> values = new ArrayList<>();
+ public final ArrayList<KIn> keys = new ArrayList<>();
+ public final ArrayList<VIn> values = new ArrayList<>();
public boolean initialized;
public boolean closed;
- public MockSourceNode(final Deserializer<K> keyDeserializer, final Deserializer<V> valDeserializer) {
+ public MockSourceNode(final Deserializer<KIn> keyDeserializer, final Deserializer<VIn> valDeserializer) {
super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer);
}
@Override
- public void process(final K key, final V value) {
+ public void process(final KIn key, final VIn value) {
this.numReceived++;
this.keys.add(key);
this.values.add(value);