You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/16 23:02:20 UTC
[kafka] branch trunk updated: KAFKA-6454: Allow timestamp
manipulation in Processor API (#4519)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 394aa74 KAFKA-6454: Allow timestamp manipulation in Processor API (#4519)
394aa74 is described below
commit 394aa7426117d0d04666c1c2a63d5f98229b7894
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 16 16:02:11 2018 -0700
KAFKA-6454: Allow timestamp manipulation in Processor API (#4519)
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
docs/streams/core-concepts.html | 4 +
docs/streams/developer-guide/processor-api.html | 4 +
docs/streams/upgrade-guide.html | 8 +
.../apache/kafka/streams/kstream/Transformer.java | 16 +-
.../kafka/streams/kstream/ValueTransformer.java | 20 ++-
.../streams/kstream/ValueTransformerWithKey.java | 15 +-
.../streams/kstream/internals/KStreamBranch.java | 15 +-
.../streams/kstream/internals/KStreamImpl.java | 15 +-
.../kstream/internals/KStreamTransformValues.java | 8 +
.../kafka/streams/processor/AbstractProcessor.java | 6 +-
.../kafka/streams/processor/ProcessorContext.java | 39 +++--
.../org/apache/kafka/streams/processor/To.java | 68 +++++++++
.../internals/AbstractProcessorContext.java | 15 --
.../internals/GlobalProcessorContextImpl.java | 30 +++-
.../processor/internals/ProcessorContextImpl.java | 67 ++++++---
.../streams/processor/internals/ProcessorNode.java | 11 +-
.../internals/ProcessorRecordContext.java | 6 +-
.../streams/processor/internals/RecordContext.java | 5 +
.../processor/internals/StandbyContextImpl.java | 11 ++
.../{RecordContext.java => ToInternal.java} | 42 +++---
.../streams/state/internals/LRUCacheEntry.java | 7 +-
.../internals/KStreamTransformValuesTest.java | 17 ++-
.../internals/AbstractProcessorContextTest.java | 22 +--
.../processor/internals/ProcessorTopologyTest.java | 163 +++++++++++++--------
.../processor/internals/RecordContextStub.java | 13 +-
.../apache/kafka/test/MockProcessorContext.java | 46 +++---
.../apache/kafka/test/NoOpProcessorContext.java | 6 +
27 files changed, 459 insertions(+), 220 deletions(-)
diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index 2f22be7..889fe06 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -127,6 +127,10 @@
<li> For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.</li>
</ul>
+ <p>
+ Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling <code>#forward()</code>.
+ </p>
+
<h3><a id="streams_state" href="#streams_state">States</a></h3>
<p>
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index fdf6c86..b51bc22 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -77,6 +77,10 @@
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).</p>
+ <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
+ (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
+ (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
+ Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
<p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
for the punctuation scheduling: either <a class="reference internal" href="../concepts.html#streams-concepts-time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 46be969..baf9633 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -85,6 +85,14 @@
In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
to let users specify inner serdes if the default serde classes are windowed serdes.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
+ /<p>
+
+ <p>
+ Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
+ To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
+ The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
+ The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
+ Forwarding based on child index is not supported in the new API any longer.
</p>
<h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 308fcad..a83b4a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
/**
* The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output
@@ -69,9 +70,8 @@ public interface Transformer<K, V, R> {
* attached} to this operator can be accessed and modified
* arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
* <p>
- * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, and
- * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+ * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)}
+ * and {@link ProcessorContext#forward(Object, Object, To)} can be used.
* If not record should be forwarded downstream, {@code transform} can return {@code null}.
*
* @param key the key for the record
@@ -86,9 +86,8 @@ public interface Transformer<K, V, R> {
* {@link ProcessorContext#schedule(long) schedules itself} with the context during
* {@link #init(ProcessorContext) initialization}.
* <p>
- * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, and
- * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+ * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
+ * {@link ProcessorContext#forward(Object, Object, To)} can be used.
* <p>
* Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to
* timestamps return by the used {@link TimestampExtractor})
@@ -105,9 +104,8 @@ public interface Transformer<K, V, R> {
/**
* Close this processor and clean up any resources.
* <p>
- * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, and
- * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+ * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
+ * {@link ProcessorContext#forward(Object, Object, To)} can be used.
*/
void close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1802a61..1da779e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
/**
* The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
@@ -58,9 +59,8 @@ public interface ValueTransformer<V, VR> {
* Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
* Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
* <p>
- * Note that using {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, or
- * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
+ * Note that using {@link ProcessorContext#forward(Object, Object)} or
+ * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of
* {@code ValueTransformer} and will result in an {@link StreamsException exception}.
*
* @param context the context
@@ -75,9 +75,8 @@ public interface ValueTransformer<V, VR> {
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
- * Note, that using {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, and
- * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
+ * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+ * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
* will result in an {@link StreamsException exception}.
*
* @param value the value to be transformed
@@ -90,9 +89,8 @@ public interface ValueTransformer<V, VR> {
* the context during {@link #init(ProcessorContext) initialization}.
* <p>
* It is not possible to return any new output records within {@code punctuate}.
- * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
- * or {@link ProcessorContext#forward(Object, Object, String)} will result in an
- * {@link StreamsException exception}.
+ * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}
+ * will result in an {@link StreamsException exception}.
* Furthermore, {@code punctuate} must return {@code null}.
* <p>
* Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to
@@ -111,8 +109,8 @@ public interface ValueTransformer<V, VR> {
* Close this processor and clean up any resources.
* <p>
* It is not possible to return any new output records within {@code close()}.
- * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
- * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
+ * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}
+ * will result in an {@link StreamsException exception}.
*/
void close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 128c61f..7f399b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
/**
* The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type).
@@ -62,9 +63,8 @@ public interface ValueTransformerWithKey<K, V, VR> {
* Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
* Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
* <p>
- * Note that using {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, or
- * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
+ * Note that using {@link ProcessorContext#forward(Object, Object)} or
+ * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of
* {@code ValueTransformerWithKey} and will result in an {@link StreamsException exception}.
*
* @param context the context
@@ -79,9 +79,8 @@ public interface ValueTransformerWithKey<K, V, VR> {
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
- * Note, that using {@link ProcessorContext#forward(Object, Object)},
- * {@link ProcessorContext#forward(Object, Object, int)}, and
- * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
+ * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+ * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
* will result in an {@link StreamsException exception}.
*
* @param readOnlyKey the read-only key
@@ -94,8 +93,8 @@ public interface ValueTransformerWithKey<K, V, VR> {
* Close this processor and clean up any resources.
* <p>
* It is not possible to return any new output records within {@code close()}.
- * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
- * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
+ * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)},
+ * will result in an {@link StreamsException exception}.
*/
void close();
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 317c5bf..baa9b63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -16,18 +16,21 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.To;
class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
private final Predicate<K, V>[] predicates;
+ private final String[] childNodes;
- @SuppressWarnings("unchecked")
- public KStreamBranch(Predicate<K, V> ... predicates) {
+ KStreamBranch(final Predicate<K, V>[] predicates,
+ final String[] childNodes) {
this.predicates = predicates;
+ this.childNodes = childNodes;
}
@Override
@@ -37,12 +40,12 @@ class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
@Override
- public void process(K key, V value) {
+ public void process(final K key, final V value) {
for (int i = 0; i < predicates.length; i++) {
if (predicates[i].test(key, value)) {
- // use forward with childIndex here and then break the loop
+ // use forward with child here and then break the loop
// so that no record is going to be piped to multiple streams
- context().forward(key, value, i);
+ context().forward(key, value, To.child(childNodes[i]));
break;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 07bc67d..349be86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -358,17 +358,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
for (final Predicate<? super K, ? super V> predicate : predicates) {
Objects.requireNonNull(predicate, "predicates can't have null values");
}
- String branchName = builder.newProcessorName(BRANCH_NAME);
- builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
+ String branchName = builder.newProcessorName(BRANCH_NAME);
- KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+ String[] childNames = new String[predicates.length];
for (int i = 0; i < predicates.length; i++) {
- String childName = builder.newProcessorName(BRANCHCHILD_NAME);
+ childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
+ }
- builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
+ builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone(), childNames), this.name);
- branchChildren[i] = new KStreamImpl<>(builder, childName, sourceNodes, this.repartitionRequired);
+ KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+ for (int i = 0; i < predicates.length; i++) {
+ builder.internalTopologyBuilder.addProcessor(childNames[i], new KStreamPassThrough<K, V>(), branchName);
+ branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired);
}
return branchChildren;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index ace4f69..e644597 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import java.io.File;
import java.util.Map;
@@ -117,10 +118,17 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
}
@Override
+ public <K, V> void forward(final K key, final V value, final To to) {
+ throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
}
+ @SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final String childName) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 1cfe78a..14e6c2a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -31,7 +31,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
}
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
this.context = context;
}
@@ -46,7 +46,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
*/
@SuppressWarnings("deprecation")
@Override
- public void punctuate(long timestamp) {
+ public void punctuate(final long timestamp) {
// do nothing
}
@@ -67,6 +67,6 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
* @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
*/
protected final ProcessorContext context() {
- return this.context;
+ return context;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 42902a8..404b225 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -83,7 +83,9 @@ public interface ProcessorContext {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
+ void register(final StateStore store,
+ final boolean loggingEnabledIsDeprecatedAndIgnored,
+ final StateRestoreCallback stateRestoreCallback);
/**
* Get the state store given the store name.
@@ -91,7 +93,7 @@ public interface ProcessorContext {
* @param name The store name
* @return The state store instance
*/
- StateStore getStateStore(String name);
+ StateStore getStateStore(final String name);
/**
* Schedules a periodic operation for processors. A processor may call this method during
@@ -125,7 +127,9 @@ public interface ProcessorContext {
* @param callback a function consuming timestamps representing the current stream or system time
* @return a handle allowing cancellation of the punctuation schedule established by this method
*/
- Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
+ Cancellable schedule(final long intervalMs,
+ final PunctuationType type,
+ final Punctuator callback);
/**
* Schedules a periodic operation for processors. A processor may call this method during
@@ -137,30 +141,47 @@ public interface ProcessorContext {
* @param interval the time interval between punctuations
*/
@Deprecated
- void schedule(long interval);
+ void schedule(final long interval);
/**
- * Forwards a key/value pair to the downstream processors
+ * Forwards a key/value pair to all downstream processors.
+ * Used the input record's timestamp as timestamp for the output record.
+ *
* @param key key
* @param value value
*/
- <K, V> void forward(K key, V value);
+ <K, V> void forward(final K key, final V value);
+
+ /**
+ * Forwards a key/value pair to the specified downstream processors.
+ * Can be used to set the timestamp of the output record.
+ *
+ * @param key key
+ * @param value value
+ * @param to the options to use when forwarding
+ */
+ <K, V> void forward(final K key, final V value, final To to);
/**
* Forwards a key/value pair to one of the downstream processors designated by childIndex
* @param key key
* @param value value
* @param childIndex index in list of children of this node
+ * @deprecated please use {@link #forward(Object, Object, To)} instead
*/
- <K, V> void forward(K key, V value, int childIndex);
+ // TODO when we remove this method, we can also remove `ProcessorNode#children`
+ @Deprecated
+ <K, V> void forward(final K key, final V value, final int childIndex);
/**
* Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
* @param key key
* @param value value
* @param childName name of downstream processor
+ * @deprecated please use {@link #forward(Object, Object, To)} instead
*/
- <K, V> void forward(K key, V value, String childName);
+ @Deprecated
+ <K, V> void forward(final K key, final V value, final String childName);
/**
* Requests a commit
@@ -231,6 +252,6 @@ public interface ProcessorContext {
* @return the key/values matching the given prefix from the StreamsConfig properties.
*
*/
- Map<String, Object> appConfigsWithPrefix(String prefix);
+ Map<String, Object> appConfigsWithPrefix(final String prefix);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
new file mode 100644
index 0000000..52007df
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * This class is used to provide the optional parameters when sending output records to downstream processor
+ * using {@link ProcessorContext#forward(Object, Object, To)}.
+ */
+public class To {
+ protected String childName;
+ protected long timestamp;
+
+ private To(final String childName,
+ final long timestamp) {
+ this.childName = childName;
+ this.timestamp = timestamp;
+ }
+
+ protected To(final To to) {
+ this(to.childName, to.timestamp);
+ }
+
+ protected void update(final To to) {
+ childName = to.childName;
+ timestamp = to.timestamp;
+ }
+
+ /**
+ * Forward the key/value pair to one of the downstream processors designated by the downstream processor name.
+ * @param childName name of downstream processor
+ * @return a new {@link To} instance configured with {@code childName}
+ */
+ public static To child(final String childName) {
+ return new To(childName, -1);
+ }
+
+ /**
+ * Forward the key/value pair to all downstream processors
+ * @return a new {@link To} instance configured for all downstream processor
+ */
+ public static To all() {
+ return new To((String) null, -1);
+ }
+
+ /**
+ * Set the timestamp of the output record.
+ * @param timestamp the output record timestamp
+ * @return itself (i.e., {@code this})
+ */
+ public To withTimestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index e9b5a4c..87408c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -164,20 +163,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
return combined;
}
- @SuppressWarnings("unchecked")
- @Override
- public <K, V> void forward(final K key, final V value) {
- final ProcessorNode previousNode = currentNode();
- try {
- for (final ProcessorNode child : (List<ProcessorNode>) currentNode().children()) {
- setCurrentNode(child);
- child.process(key, value);
- }
- } finally {
- setCurrentNode(previousNode);
- }
- }
-
@Override
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return config.originalsWithPrefix(prefix);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 37e7cb5..88d9f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -23,8 +23,11 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import java.util.List;
+
public class GlobalProcessorContextImpl extends AbstractProcessorContext {
@@ -40,20 +43,43 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
return stateManager.getGlobalStore(name);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <K, V> void forward(final K key, final V value) {
+ final ProcessorNode previousNode = currentNode();
+ try {
+ for (final ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
+ setCurrentNode(child);
+ child.process(key, value);
+ }
+ } finally {
+ setCurrentNode(previousNode);
+ }
+ }
+
/**
* @throws UnsupportedOperationException on every invocation
*/
@Override
- public <K, V> void forward(K key, V value, int childIndex) {
+ public <K, V> void forward(final K key, final V value, final To to) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
}
+ /**
+ * @throws UnsupportedOperationException on every invocation
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public <K, V> void forward(final K key, final V value, final int childIndex) {
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
+ }
/**
* @throws UnsupportedOperationException on every invocation
*/
+ @SuppressWarnings("deprecation")
@Override
- public <K, V> void forward(K key, V value, String childName) {
+ public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 42d3d70..3761bfb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -18,12 +18,13 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.List;
@@ -32,6 +33,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
private final StreamTask task;
private final RecordCollector collector;
+ private final ToInternal toInternal = new ToInternal();
+ private final static To SEND_TO_ALL = To.all();
ProcessorContextImpl(final TaskId id,
final StreamTask task,
@@ -77,32 +80,60 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@SuppressWarnings("unchecked")
@Override
+ public <K, V> void forward(final K key, final V value) {
+ forward(key, value, SEND_TO_ALL);
+ }
+
+ @SuppressWarnings({"unchecked", "deprecation"})
+ @Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
+ forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
+ }
+
+ @SuppressWarnings({"unchecked", "deprecation"})
+ @Override
+ public <K, V> void forward(final K key, final V value, final String childName) {
+ forward(key, value, To.child(childName));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <K, V> void forward(final K key, final V value, final To to) {
+ toInternal.update(to);
+ if (toInternal.hasTimestamp()) {
+ recordContext.setTimestamp(toInternal.timestamp());
+ }
final ProcessorNode previousNode = currentNode();
- final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex);
- setCurrentNode(child);
try {
- child.process(key, value);
+ final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
+ final String sendTo = toInternal.child();
+ if (sendTo != null) {
+ final ProcessorNode child = currentNode().getChild(sendTo);
+ if (child == null) {
+ throw new StreamsException("Unknown processor name: " + sendTo);
+ }
+ forward(child, key, value);
+ } else {
+ if (children.size() == 1) {
+ final ProcessorNode child = children.get(0);
+ forward(child, key, value);
+ } else {
+ for (final ProcessorNode child : children) {
+ forward(child, key, value);
+ }
+ }
+ }
} finally {
setCurrentNode(previousNode);
}
}
@SuppressWarnings("unchecked")
- @Override
- public <K, V> void forward(final K key, final V value, final String childName) {
- for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
- if (child.name().equals(childName)) {
- ProcessorNode previousNode = currentNode();
- setCurrentNode(child);
- try {
- child.process(key, value);
- return;
- } finally {
- setCurrentNode(previousNode);
- }
- }
- }
+ private <K, V> void forward(final ProcessorNode child,
+ final K key,
+ final V value) {
+ setCurrentNode(child);
+ child.process(key, value);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 29f442f..94e8640 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -26,12 +26,16 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class ProcessorNode<K, V> {
+ // TODO: 'children' can be removed when #forward() via index is removed
private final List<ProcessorNode<?, ?>> children;
+ private final Map<String, ProcessorNode<?, ?>> childByName;
private final String name;
private final Processor<K, V> processor;
@@ -75,6 +79,7 @@ public class ProcessorNode<K, V> {
this.name = name;
this.processor = processor;
this.children = new ArrayList<>();
+ this.childByName = new HashMap<>();
this.stateStores = stateStores;
this.time = new SystemTime();
}
@@ -92,11 +97,15 @@ public class ProcessorNode<K, V> {
return children;
}
+ public final ProcessorNode getChild(final String childName) {
+ return childByName.get(childName);
+ }
+
public void addChild(ProcessorNode<?, ?> child) {
children.add(child);
+ childByName.put(child.name, child);
}
-
public void init(ProcessorContext context) {
this.context = context;
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index aa20103..92acfc9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -20,7 +20,7 @@ import java.util.Objects;
public class ProcessorRecordContext implements RecordContext {
- private final long timestamp;
+ private long timestamp;
private final long offset;
private final String topic;
private final int partition;
@@ -44,6 +44,10 @@ public class ProcessorRecordContext implements RecordContext {
return timestamp;
}
+ public void setTimestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ }
+
@Override
public String topic() {
return topic;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
index dc752cb..dd58f4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -34,6 +34,11 @@ public interface RecordContext {
long timestamp();
/**
+ * Sets a new timestamp for the output record.
+ */
+ void setTimestamp(final long timestamp);
+
+ /**
* @return The topic the record was received on
*/
String topic();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e38b821..360c4ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collections;
@@ -142,6 +143,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
* @throws UnsupportedOperationException on every invocation
*/
@Override
+ public <K, V> void forward(final K key, final V value, final To to) {
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
+ }
+
+ /**
+ * @throws UnsupportedOperationException on every invocation
+ */
+ @SuppressWarnings("deprecation")
+ @Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@@ -149,6 +159,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
/**
* @throws UnsupportedOperationException on every invocation
*/
+ @SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final String childName) {
throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
similarity index 59%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
index dc752cb..6c5798e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
@@ -16,30 +16,26 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.To;
-/**
- * The context associated with the current record being processed by
- * an {@link Processor}
- */
-public interface RecordContext {
- /**
- * @return The offset of the original record received from Kafka
- */
- long offset();
+public class ToInternal extends To {
+ public ToInternal() {
+ super(To.all());
+ }
+
+ public void update(final To to) {
+ super.update(to);
+ }
- /**
- * @return The timestamp extracted from the record received from Kafka
- */
- long timestamp();
+ public boolean hasTimestamp() {
+ return timestamp != -1;
+ }
- /**
- * @return The topic the record was received on
- */
- String topic();
+ public long timestamp() {
+ return timestamp;
+ }
- /**
- * @return The partition the record was received on
- */
- int partition();
-}
+ public String child() {
+ return childName;
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index dedb906..af7059b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -27,7 +27,7 @@ class LRUCacheEntry implements RecordContext {
private final long offset;
private final String topic;
private final int partition;
- private final long timestamp;
+ private long timestamp;
private long sizeBytes;
private boolean isDirty;
@@ -64,6 +64,11 @@ class LRUCacheEntry implements RecordContext {
}
@Override
+ public void setTimestamp(final long timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String topic() {
return topic;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 1b34fab..dc0b886 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -23,18 +23,19 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Rule;
import org.junit.Test;
-import static org.junit.Assert.fail;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
public class KStreamTransformValuesTest {
@@ -192,6 +193,13 @@ public class KStreamTransformValuesTest {
}
try {
+ transformValueProcessor.process(null, 3);
+ fail("should not allow call to context.forward() within ValueTransformer");
+ } catch (final StreamsException e) {
+ // expected
+ }
+
+ try {
transformValueProcessor.punctuate(0);
fail("should not allow ValueTransformer#puntuate() to return not-null value");
} catch (final StreamsException e) {
@@ -213,11 +221,14 @@ public class KStreamTransformValuesTest {
context.forward(null, null);
}
if (value == 1) {
- context.forward(null, null, null);
+ context.forward(null, null, (String) null);
}
if (value == 2) {
context.forward(null, null, 0);
}
+ if (value == 3) {
+ context.forward(null, null, To.all());
+ }
throw new RuntimeException("Should never happen in this test");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 46c23c6..aac275d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockStateStore;
@@ -176,28 +177,21 @@ public class AbstractProcessorContextTest {
}
@Override
- public void schedule(final long interval) {
-
- }
+ public void schedule(final long interval) {}
@Override
- public <K, V> void forward(final K key, final V value) {
-
- }
+ public <K, V> void forward(final K key, final V value) {}
@Override
- public <K, V> void forward(final K key, final V value, final int childIndex) {
-
- }
+ public <K, V> void forward(final K key, final V value, final To to) {}
@Override
- public <K, V> void forward(final K key, final V value, final String childName) {
-
- }
+ public <K, V> void forward(final K key, final V value, final int childIndex) {}
@Override
- public void commit() {
+ public <K, V> void forward(final K key, final V value, final String childName) {}
- }
+ @Override
+ public void commit() {}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index da2e5dc..d07274a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -72,8 +73,8 @@ public class ProcessorTopologyTest {
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
- File localState = TestUtils.tempDirectory();
- Properties props = new Properties();
+ final File localState = TestUtils.tempDirectory();
+ final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
@@ -120,8 +121,8 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingSimpleTopology() throws Exception {
- int partition = 10;
+ public void testDrivingSimpleTopology() {
+ final int partition = 10;
driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -142,7 +143,7 @@ public class ProcessorTopologyTest {
@Test
- public void testDrivingMultiplexingTopology() throws Exception {
+ public void testDrivingMultiplexingTopology() {
driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
@@ -164,7 +165,7 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingMultiplexByNameTopology() throws Exception {
+ public void testDrivingMultiplexByNameTopology() {
driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
@@ -186,8 +187,8 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingStatefulTopology() throws Exception {
- String storeName = "entries";
+ public void testDrivingStatefulTopology() {
+ final String storeName = "entries";
driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -195,7 +196,7 @@ public class ProcessorTopologyTest {
driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
assertNoOutputRecord(OUTPUT_TOPIC_1);
- KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
+ final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName);
assertEquals("value4", store.get("key1"));
assertEquals("value2", store.get("key2"));
assertEquals("value3", store.get("key3"));
@@ -205,15 +206,16 @@ public class ProcessorTopologyTest {
@SuppressWarnings("unchecked")
@Test
public void shouldDriveGlobalStore() {
- final StateStoreSupplier storeSupplier = Stores.create("my-store")
+ final String storeName = "my-store";
+ final StateStoreSupplier storeSupplier = Stores.create(storeName)
.withStringKeys().withStringValues().inMemory().disableLogging().build();
final String global = "global";
final String topic = "topic";
final TopologyBuilder topologyBuilder = this.builder
- .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
+ .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder);
- final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
+ final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get(storeName);
driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
assertEquals("value1", globalStore.get("key1"));
@@ -221,8 +223,8 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingSimpleMultiSourceTopology() throws Exception {
- int partition = 10;
+ public void testDrivingSimpleMultiSourceTopology() {
+ final int partition = 10;
driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -235,7 +237,7 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingForwardToSourceTopology() throws Exception {
+ public void testDrivingForwardToSourceTopology() {
driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -246,7 +248,7 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingInternalRepartitioningTopology() throws Exception {
+ public void testDrivingInternalRepartitioningTopology() {
driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -257,7 +259,7 @@ public class ProcessorTopologyTest {
}
@Test
- public void testDrivingInternalRepartitioningForwardingTimestampTopology() throws Exception {
+ public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -315,7 +317,7 @@ public class ProcessorTopologyTest {
}
@Test
- public void shouldConsiderTimeStamps() throws Exception {
+ public void shouldConsiderTimeStamps() {
final int partition = 10;
driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
@@ -326,6 +328,17 @@ public class ProcessorTopologyTest {
assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L);
}
+ @Test
+ public void shouldConsiderModifiedTimeStamps() {
+ final int partition = 10;
+ driver = new ProcessorTopologyTestDriver(config, createTimestampTopology(partition).internalTopologyBuilder);
+ driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
+ driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
+ driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 30L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
+ }
private void assertNextOutputRecord(final String topic,
final String key,
@@ -345,7 +358,7 @@ public class ProcessorTopologyTest {
final String value,
final Integer partition,
final Long timestamp) {
- ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
+ final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
assertEquals(topic, record.topic());
assertEquals(key, record.key());
assertEquals(value, record.value());
@@ -353,51 +366,63 @@ public class ProcessorTopologyTest {
assertEquals(timestamp, record.timestamp());
}
- private void assertNoOutputRecord(String topic) {
+ private void assertNoOutputRecord(final String topic) {
assertNull(driver.readOutput(topic));
}
private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
return new StreamPartitioner<Object, Object>() {
@Override
- public Integer partition(Object key, Object value, int numPartitions) {
+ public Integer partition(final Object key, final Object value, final int numPartitions) {
return partition;
}
};
}
- private TopologyBuilder createSimpleTopology(int partition) {
- return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
- .addProcessor("processor", define(new ForwardingProcessor()), "source")
- .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+ private TopologyBuilder createSimpleTopology(final int partition) {
+ return builder
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor", define(new ForwardingProcessor()), "source")
+ .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+ }
+
+ private TopologyBuilder createTimestampTopology(final int partition) {
+ return builder
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor", define(new TimestampProcessor()), "source")
+ .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
}
private TopologyBuilder createMultiplexingTopology() {
- return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
- .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
- .addSink("sink1", OUTPUT_TOPIC_1, "processor")
- .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+ return builder
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
+ .addSink("sink1", OUTPUT_TOPIC_1, "processor")
+ .addSink("sink2", OUTPUT_TOPIC_2, "processor");
}
private TopologyBuilder createMultiplexByNameTopology() {
- return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ return builder
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
.addSink("sink0", OUTPUT_TOPIC_1, "processor")
.addSink("sink1", OUTPUT_TOPIC_2, "processor");
}
- private TopologyBuilder createStatefulTopology(String storeName) {
- return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
- .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
- .addStateStore(
- Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
- "processor"
- )
- .addSink("counts", OUTPUT_TOPIC_1, "processor");
+ private TopologyBuilder createStatefulTopology(final String storeName) {
+ return builder
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+ .addStateStore(
+ Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
+ "processor"
+ )
+ .addSink("counts", OUTPUT_TOPIC_1, "processor");
}
private TopologyBuilder createInternalRepartitioningTopology() {
- return builder.addSource("source", INPUT_TOPIC_1)
+ return builder
+ .addSource("source", INPUT_TOPIC_1)
.addInternalTopic(THROUGH_TOPIC_1)
.addSink("sink0", THROUGH_TOPIC_1, "source")
.addSource("source1", THROUGH_TOPIC_1)
@@ -405,12 +430,13 @@ public class ProcessorTopologyTest {
}
private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
- return builder.addSource("source", INPUT_TOPIC_1)
- .addInternalTopic(THROUGH_TOPIC_1)
- .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
- .addSink("sink0", THROUGH_TOPIC_1, "processor")
- .addSource("source1", THROUGH_TOPIC_1)
- .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+ return builder
+ .addSource("source", INPUT_TOPIC_1)
+ .addInternalTopic(THROUGH_TOPIC_1)
+ .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
+ .addSink("sink0", THROUGH_TOPIC_1, "processor")
+ .addSource("source1", THROUGH_TOPIC_1)
+ .addSink("sink1", OUTPUT_TOPIC_1, "source1");
}
private TopologyBuilder createForwardToSourceTopology() {
@@ -434,26 +460,34 @@ public class ProcessorTopologyTest {
* A processor that simply forwards all messages to all children.
*/
protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
-
@Override
- public void process(String key, String value) {
+ public void process(final String key, final String value) {
context().forward(key, value);
}
@Override
- public void punctuate(long streamTime) {
+ public void punctuate(final long streamTime) {
context().forward(Long.toString(streamTime), "punctuate");
}
}
/**
+ * A processor that simply forwards all messages to all children with advanced timestamps.
+ */
+ protected static class TimestampProcessor extends AbstractProcessor<String, String> {
+ @Override
+ public void process(final String key, final String value) {
+ context().forward(key, value, To.all().withTimestamp(context().timestamp() + 10));
+ }
+ }
+
+ /**
* A processor that removes custom timestamp information from messages and forwards modified messages to each child.
* A message contains custom timestamp information if the value is in ".*@[0-9]+" format.
*/
protected static class ValueTimestampProcessor extends AbstractProcessor<String, String> {
-
@Override
- public void process(String key, String value) {
+ public void process(final String key, final String value) {
context().forward(key, value.split("@")[0]);
}
}
@@ -462,22 +496,23 @@ public class ProcessorTopologyTest {
* A processor that forwards slightly-modified messages to each child.
*/
protected static class MultiplexingProcessor extends AbstractProcessor<String, String> {
-
private final int numChildren;
- public MultiplexingProcessor(int numChildren) {
+ MultiplexingProcessor(final int numChildren) {
this.numChildren = numChildren;
}
+ @SuppressWarnings("deprecation")
@Override
- public void process(String key, String value) {
+ public void process(final String key, final String value) {
for (int i = 0; i != numChildren; ++i) {
context().forward(key, value + "(" + (i + 1) + ")", i);
}
}
+ @SuppressWarnings("deprecation")
@Override
- public void punctuate(long streamTime) {
+ public void punctuate(final long streamTime) {
for (int i = 0; i != numChildren; ++i) {
context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
}
@@ -489,22 +524,23 @@ public class ProcessorTopologyTest {
* Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
*/
protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
-
private final int numChildren;
- public MultiplexByNameProcessor(int numChildren) {
+ MultiplexByNameProcessor(final int numChildren) {
this.numChildren = numChildren;
}
+ @SuppressWarnings("deprecation")
@Override
- public void process(String key, String value) {
+ public void process(final String key, final String value) {
for (int i = 0; i != numChildren; ++i) {
- context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+ context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
}
}
+ @SuppressWarnings("deprecation")
@Override
- public void punctuate(long streamTime) {
+ public void punctuate(final long streamTime) {
for (int i = 0; i != numChildren; ++i) {
context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
}
@@ -516,28 +552,27 @@ public class ProcessorTopologyTest {
* {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
*/
protected static class StatefulProcessor extends AbstractProcessor<String, String> {
-
private KeyValueStore<String, String> store;
private final String storeName;
- public StatefulProcessor(String storeName) {
+ StatefulProcessor(final String storeName) {
this.storeName = storeName;
}
@Override
@SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
super.init(context);
store = (KeyValueStore<String, String>) context.getStateStore(storeName);
}
@Override
- public void process(String key, String value) {
+ public void process(final String key, final String value) {
store.put(key, value);
}
@Override
- public void punctuate(long streamTime) {
+ public void punctuate(final long streamTime) {
int count = 0;
try (KeyValueIterator<String, String> iter = store.all()) {
while (iter.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
index 7932d1f..0af5e17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -19,14 +19,18 @@ package org.apache.kafka.streams.processor.internals;
public class RecordContextStub implements RecordContext {
private final long offset;
- private final long timestamp;
+ private long timestamp;
private final int partition;
private final String topic;
public RecordContextStub() {
this(-1, -1, -1, "");
}
- public RecordContextStub(final long offset, final long timestamp, final int partition, final String topic) {
+
+ public RecordContextStub(final long offset,
+ final long timestamp,
+ final int partition,
+ final String topic) {
this.offset = offset;
this.timestamp = timestamp;
this.partition = partition;
@@ -44,6 +48,11 @@ public class RecordContextStub implements RecordContext {
}
@Override
+ public void setTimestamp(final long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
public String topic() {
return topic;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 06137fb..6b0cb66 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -28,12 +28,14 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -53,6 +55,7 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
private final RecordCollector.Supplier recordCollectorSupplier;
private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
+ private final ToInternal toInternal = new ToInternal();
private Serde<?> keySerde;
private Serde<?> valSerde;
@@ -179,44 +182,39 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
@Override
@SuppressWarnings("unchecked")
public <K, V> void forward(final K key, final V value) {
- final ProcessorNode thisNode = currentNode;
- for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
- currentNode = childNode;
- try {
- childNode.process(key, value);
- } finally {
- currentNode = thisNode;
- }
- }
+ forward(key, value, To.all());
}
@Override
@SuppressWarnings("unchecked")
public <K, V> void forward(final K key, final V value, final int childIndex) {
- final ProcessorNode thisNode = currentNode;
- final ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
- currentNode = childNode;
- try {
- childNode.process(key, value);
- } finally {
- currentNode = thisNode;
- }
+ forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
}
@Override
@SuppressWarnings("unchecked")
public <K, V> void forward(final K key, final V value, final String childName) {
+ forward(key, value, To.child(childName));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <K, V> void forward(final K key, final V value, final To to) {
+ toInternal.update(to);
+ if (toInternal.hasTimestamp()) {
+ setTime(toInternal.timestamp());
+ }
final ProcessorNode thisNode = currentNode;
- for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
- if (childNode.name().equals(childName)) {
- currentNode = childNode;
- try {
+ try {
+ for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+ if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
+ currentNode = childNode;
childNode.process(key, value);
- } finally {
- currentNode = thisNode;
+ toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple Processors and toInternal might have been modified
}
- break;
}
+ } finally {
+ currentNode = thisNode;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index afa0639..6b5d47a 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -65,6 +66,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
}
@Override
+ public <K, V> void forward(final K key, final V value, final To to) {
+ forwardedValues.put(key, value);
+ }
+
+ @Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
forward(key, value);
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.