You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/31 13:30:02 UTC
[kafka] branch trunk updated: KAFKA-6958: Overload KStream methods
to allow to name operation name using the new Named class (#6411)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78c55c8 KAFKA-6958: Overload KStream methods to allow to name operation name using the new Named class (#6411)
78c55c8 is described below
commit 78c55c8d66f5570d975caa53a9751b126ca10538
Author: Florian Hussonnois <fl...@gmail.com>
AuthorDate: Fri May 31 15:29:43 2019 +0200
KAFKA-6958: Overload KStream methods to allow to name operation name using the new Named class (#6411)
Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
- overload methods for stateless operations to accept a Named parameter (filter, filterNot, map, mapValues, foreach, peek, branch, transform, transformValue, flatTransform)
- overload process method to accept a Named parameter
- overload join/leftJoin/outerJoin methods
Reviewers: John Roesler <jo...@confluent.io>, Boyang Chen <bo...@confluent.io>,
Bill Bejeck <bb...@gmail.com>
---
.../org/apache/kafka/streams/kstream/KStream.java | 1229 ++++++++++++++++++--
.../streams/kstream/internals/KStreamImpl.java | 374 ++++--
.../streams/kstream/internals/NamedInternal.java | 19 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 245 +++-
.../kstream/RepartitionTopicNamingTest.java | 66 +-
5 files changed, 1694 insertions(+), 239 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index cd64f75..8313add 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -67,6 +67,19 @@ public interface KStream<K, V> {
KStream<K, V> filter(final Predicate<? super K, ? super V> predicate);
/**
+ * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
+ * All records that do not satisfy the predicate are dropped.
+ * This is a stateless record-by-record operation.
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @return a {@code KStream} that contains only those records that satisfy the given predicate
+ * @see #filterNot(Predicate)
+ */
+ KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named);
+
+
+ /**
* Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given
* predicate.
* All records that <em>do</em> satisfy the predicate are dropped.
@@ -79,6 +92,19 @@ public interface KStream<K, V> {
KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
/**
+ * Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given
+ * predicate.
+ * All records that <em>do</em> satisfy the predicate are dropped.
+ * This is a stateless record-by-record operation.
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @return a {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate
+ * @see #filter(Predicate)
+ */
+ KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named);
+
+ /**
* Set a new key (with possibly new type) for each input record.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
@@ -111,6 +137,40 @@ public interface KStream<K, V> {
<KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
/**
+ * Set a new key (with possibly new type) for each input record.
+ * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} by
+ * extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
+ * length of the value string.
+ * <pre>{@code
+ * KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
+ * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
+ * Integer apply(Byte[] key, String value) {
+ * return value.length();
+ * }
+ * });
+ * }</pre>
+ * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
+ * join) is applied to the result {@code KStream}.
+ *
+ * @param mapper a {@link KeyValueMapper} that computes a new key for each record
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <KR> the new key type of the result stream
+ * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ */
+ <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
+ final Named named);
+
+ /**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
@@ -149,6 +209,46 @@ public interface KStream<K, V> {
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
/**
+ * Transform each record of the input stream into a new record in the output stream (both key and value type can be
+ * altered arbitrarily).
+ * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
+ * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
+ * stateful record transformation).
+ * <p>
+ * The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
+ * KeyValue<String, Integer> apply(String key, String value) {
+ * return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
+ * }
+ * });
+ * }</pre>
+ * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
+ * <p>
+ * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
+ * join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
+ *
+ * @param mapper a {@link KeyValueMapper} that computes a new output record
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <KR> the key type of the result stream
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with new key and value (possibly both of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
+ final Named named);
+
+ /**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
@@ -182,6 +282,43 @@ public interface KStream<K, V> {
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
+
+ /**
+ * Transform the value of each input record into a new value (with possible new type) of the output record.
+ * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation (cf.
+ * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+ * <p>
+ * The example below counts the number of token of the value string.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
+ * Integer apply(String value) {
+ * return value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * Setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapper} that computes a new output value
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
+ final Named named);
+
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
@@ -218,6 +355,43 @@ public interface KStream<K, V> {
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
/**
+ * Transform the value of each input record into a new value (with possible new type) of the output record.
+ * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation (cf.
+ * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value transformation).
+ * <p>
+ * The example below counts the number of tokens of key and value strings.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
+ * Integer apply(String readOnlyKey, String value) {
+ * return readOnlyKey.split(" ").length + value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapperWithKey} that computes a new output value
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
+ final Named named);
+
+ /**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
@@ -269,6 +443,59 @@ public interface KStream<K, V> {
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
/**
+ * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+ * can be altered arbitrarily).
+ * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
+ * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)} for
+ * stateful record transformation).
+ * <p>
+ * The example below splits input records {@code <null:String>} containing sentences as values into their words
+ * and emit a record {@code <word:1>} for each word.
+ * <pre>{@code
+ * KStream<byte[], String> inputStream = builder.stream("topic");
+ * KStream<String, Integer> outputStream = inputStream.flatMap(
+ * new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
+ * Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
+ * String[] tokens = value.split(" ");
+ * List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+ *
+ * for(String token : tokens) {
+ * result.add(new KeyValue<>(token, 1));
+ * }
+ *
+ * return result;
+ * }
+ * });
+ * }</pre>
+ * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+ * and the return value must not be {@code null}.
+ * <p>
+ * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
+ * or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)})
+ *
+ * @param mapper a {@link KeyValueMapper} that computes the new output records
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <KR> the key type of the result stream
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #flatMapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #flatTransform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #flatTransformValues(ValueTransformerSupplier, String...)
+ * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper,
+ final Named named);
+
+ /**
* Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
* with the same key in the new stream.
* Transform the value of each input record into zero or more records with the same (unmodified) key in the output
@@ -316,6 +543,50 @@ public interface KStream<K, V> {
* with the same key in the new stream.
* Transform the value of each input record into zero or more records with the same (unmodified) key in the output
* stream (value type can be altered arbitrarily).
+ * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+ * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
+ * for stateful value transformation).
+ * <p>
+ * The example below splits input records {@code <null:String>} containing sentences as values into their words.
+ * <pre>{@code
+ * KStream<byte[], String> inputStream = builder.stream("topic");
+ * KStream<byte[], String> outputStream = inputStream.flatMapValues(new ValueMapper<String, Iterable<String>> {
+ * Iterable<String> apply(String value) {
+ * return Arrays.asList(value.split(" "));
+ * }
+ * });
+ * }</pre>
+ * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+ * and the return value must not be {@code null}.
+ * <p>
+ * Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapper} the computes the new output values
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #flatTransform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #flatTransformValues(ValueTransformerSupplier, String...)
+ * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper,
+ final Named named);
+ /**
+ * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
+ * with the same key in the new stream.
+ * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+ * stream (value type can be altered arbitrarily).
* The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
@@ -361,13 +632,63 @@ public interface KStream<K, V> {
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
/**
- * Print the records of this KStream using the options provided by {@link Printed}
- * Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print.
- * It <em>SHOULD NOT</em> be used for production usage if performance requirements are concerned.
- *
- * @param printed options for printing
- */
- void print(final Printed<K, V> printed);
+ * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values
+ * with the same key in the new stream.
+ * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+ * stream (value type can be altered arbitrarily).
+ * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+ * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
+ * for stateful value transformation).
+ * <p>
+ * The example below splits input records {@code <Integer:String>}, with key=1, containing sentences as values
+ * into their words.
+ * <pre>{@code
+ * KStream<Integer, String> inputStream = builder.stream("topic");
+ * KStream<Integer, String> outputStream = inputStream.flatMapValues(new ValueMapper<Integer, String, Iterable<String>> {
+ * Iterable<Integer, String> apply(Integer readOnlyKey, String value) {
+ * if(readOnlyKey == 1) {
+ * return Arrays.asList(value.split(" "));
+ * } else {
+ * return Arrays.asList(value);
+ * }
+ * }
+ * });
+ * }</pre>
+ * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+ * and the return value must not be {@code null}.
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapperWithKey} the computes the new output values
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #flatTransform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #flatTransformValues(ValueTransformerSupplier, String...)
+ * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
+ */
+ <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper,
+ final Named named);
+ /**
+ * Print the records of this KStream using the options provided by {@link Printed}
+ * Note that this is mainly for debugging/testing purposes, and it will try to flush on each record print.
+ * It <em>SHOULD NOT</em> be used for production usage if performance requirements are concerned.
+ *
+ * @param printed options for printing
+ */
+ void print(final Printed<K, V> printed);
/**
* Perform an action on each record of {@code KStream}.
@@ -382,6 +703,17 @@ public interface KStream<K, V> {
/**
* Perform an action on each record of {@code KStream}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+ * Note that this is a terminal operation that returns void.
+ *
+ * @param action an action to perform on each record
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @see #process(ProcessorSupplier, String...)
+ */
+ void foreach(final ForeachAction<? super K, ? super V> action, final Named named);
+
+ /**
+ * Perform an action on each record of {@code KStream}.
+ * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
* <p>
* Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
* and returns an unchanged stream.
@@ -395,6 +727,22 @@ public interface KStream<K, V> {
KStream<K, V> peek(final ForeachAction<? super K, ? super V> action);
/**
+ * Perform an action on each record of {@code KStream}.
+ * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+ * <p>
+ * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
+ * and returns an unchanged stream.
+ * <p>
+ * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases.
+ *
+ * @param action an action to perform on each record
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @see #process(ProcessorSupplier, String...)
+ * @return itself
+ */
+ KStream<K, V> peek(final ForeachAction<? super K, ? super V> action, final Named named);
+
+ /**
* Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
* the supplied predicates.
* Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
@@ -411,6 +759,23 @@ public interface KStream<K, V> {
KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates);
/**
+ * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
+ * the supplied predicates.
+ * Each record is evaluated against the supplied predicates, and predicates are evaluated in order.
+ * Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
+ * The branching happens on first-match: A record in the original stream is assigned to the corresponding result
+ * stream for the first predicate that evaluates to true, and is assigned to this stream only.
+ * A record will be dropped if none of the predicates evaluate to true.
+ * This is a stateless record-by-record operation.
+ *
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param predicates the ordered list of {@link Predicate} instances
+ * @return multiple distinct substreams of this {@code KStream}
+ */
+ @SuppressWarnings("unchecked")
+ KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super V>... predicates);
+
+ /**
* Merge this stream and the given stream into one larger stream.
* <p>
* There is no ordering guarantee between records from this {@code KStream} and records from
@@ -424,6 +789,20 @@ public interface KStream<K, V> {
KStream<K, V> merge(final KStream<K, V> stream);
/**
+ * Merge this stream and the given stream into one larger stream.
+ * <p>
+ * There is no ordering guarantee between records from this {@code KStream} and records from
+ * the provided {@code KStream} in the merged stream.
+ * Relative order is preserved within each input stream though (ie, records within one input
+ * stream are processed in order).
+ *
+ * @param stream a stream which is to be merged into this stream
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @return a merged stream containing all records from this and the provided {@code KStream}
+ */
+ KStream<K, V> merge(final KStream<K, V> stream, final Named named);
+
+ /**
* Materialize this stream to a topic and creates a new {@code KStream} from the topic using default serializers,
* deserializers, and producer's {@link DefaultPartitioner}.
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
@@ -590,6 +969,99 @@ public interface KStream<K, V> {
final String... stateStoreNames);
/**
+ * Transform each record of the input stream into zero or one record in the output stream (both key and value type
+ * can be altered arbitrarily).
+ * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
+ * returns zero or one output record.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #map(KeyValueMapper) map()}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()},
+ * the processing progress can be observed and additional periodic actions can be performed.
+ * <p>
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
+ * }</pre>
+ * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object)
+ * transform()}.
+ * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+ * in which case no record is emitted.
+ * <pre>{@code
+ * new TransformerSupplier() {
+ * Transformer get() {
+ * return new Transformer() {
+ * private ProcessorContext context;
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.context = context;
+ * this.state = context.getStateStore("myTransformState");
+ * // punctuate each second; can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * KeyValue transform(K key, V value) {
+ * // can access this.state
+ * return new KeyValue(key, value); // can emit a single value via return -- can also be null
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code transform()}.
+ * <p>
+ * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
+ * or join) is applied to the result {@code KStream}.
+ * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} )
+ * <p>
+ * Note that it is possible to emit multiple records for each input record by using
+ * {@link ProcessorContext#forward(Object, Object) context#forward()} in
+ * {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+ * detected at runtime.
+ * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+ * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ * If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted
+ * for each input record, it is recommended to use {@link #flatTransform(TransformerSupplier, String...)
+ * flatTransform()}.
+ *
+ * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <K1> the key type of the new stream
+ * @param <V1> the value type of the new stream
+ * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+ * @see #map(KeyValueMapper)
+ * @see #flatTransform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #process(ProcessorSupplier, String...)
+ */
+ <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
+ final Named named,
+ final String... stateStoreNames);
+
+ /**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
@@ -682,52 +1154,465 @@ public interface KStream<K, V> {
final String... stateStoreNames);
/**
- * Transform the value of each input record into a new value (with possibly a new type) of the output record.
- * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
- * record value and computes a new value for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
- * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
- * can be observed and additional periodic actions can be performed.
+ * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+ * can be altered arbitrarily).
+ * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
+ * returns zero or more output records.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
+ * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper) flatMap()} for stateless
+ * record transformation).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+ * the processing progress can be observed and additional periodic actions can be performed.
* <p>
- * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
- * connect global state stores; read-only access to global state stores is available by default):
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
- * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+ * KStream outputStream = inputStream.flatTransform(new TransformerSupplier() { ... }, "myTransformState");
* }</pre>
- * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
- * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null},
- * no records are emitted.
- * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
- * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
- * emit a {@link KeyValue} pair.
+ * Within the {@link Transformer}, the state is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
+ * punctuate()}, a schedule must be registered.
+ * The {@link Transformer} must return an {@link java.lang.Iterable} type (e.g., any {@link java.util.Collection}
+ * type) in {@link Transformer#transform(Object, Object) transform()}.
+ * The return value of {@link Transformer#transform(Object, Object) Transformer#transform()} may be {@code null},
+ * which is equal to returning an empty {@link java.lang.Iterable Iterable}, i.e., no records are emitted.
* <pre>{@code
- * new ValueTransformerSupplier() {
- * ValueTransformer get() {
- * return new ValueTransformer() {
+ * new TransformerSupplier() {
+ * Transformer get() {
+ * return new Transformer() {
+ * private ProcessorContext context;
* private StateStore state;
*
* void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
+ * this.context = context;
+ * this.state = context.getStateStore("myTransformState");
+ * // punctuate each second; can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
- * NewValueType transform(V value) {
+ * Iterable<KeyValue> transform(K key, V value) {
* // can access this.state
- * return new NewValueType(); // or null
+ * List<KeyValue> result = new ArrayList<>();
+ * for (int i = 0; i < 3; i++) {
+ * result.add(new KeyValue(key, value));
+ * }
+ * return result; // emits a list of key-value pairs via return
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code flatTransform()}.
+ * <p>
+ * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
+ * or join) is applied to the result {@code KStream}.
+ * (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()})
+ * <p>
+ * Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object)
+ * context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ * Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
+ * detected at runtime.
+ * To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
+ * not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
+ * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
+ *
+ * @param transformerSupplier an instance of {@link TransformerSupplier} that generates a {@link Transformer}
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <K1> the key type of the new stream
+ * @param <V1> the value type of the new stream
+ * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type)
+ * @see #flatMap(KeyValueMapper)
+ * @see #transform(TransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #process(ProcessorSupplier, String...)
+ */
+ <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+ final Named named,
+ final String... stateStoreNames);
+
+ /**
+ * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+ * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+ * record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
+ * <p>
+ * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+ * connect global state stores; read-only access to global state stores is available by default):
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
+ * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null},
+ * no records are emitted.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+ * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+ * emit a {@link KeyValue} pair.
+ * <pre>{@code
+ * new ValueTransformerSupplier() {
+ * ValueTransformer get() {
+ * return new ValueTransformer() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * // punctuate each second, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * NewValueType transform(V value) {
+ * // can access this.state
+ * return new NewValueType(); // or null
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code transformValues()}.
+ * <p>
+ * Setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ *
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+ * {@link ValueTransformer}
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
+ final String... stateStoreNames);
+ /**
+ * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+ * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+ * record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
+ * <p>
+ * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+ * connect global state stores; read-only access to global state stores is available by default):
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
+ * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@code null}, no
+ * records are emitted.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+ * pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+ * emit a {@link KeyValue} pair.
+ * <pre>{@code
+ * new ValueTransformerSupplier() {
+ * ValueTransformer get() {
+ * return new ValueTransformer() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * // punctuate each second, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * NewValueType transform(V value) {
+ * // can access this.state
+ * return new NewValueType(); // or null
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code transformValues()}.
+ * <p>
+ * Setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ *
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+ * {@link ValueTransformer}
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
+ final Named named,
+ final String... stateStoreNames);
+
+ /**
+ * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+ * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+ * each input record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
+ * <p>
+ * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+ * connect global state stores; read-only access to global state stores is available by default):
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * The {@link ValueTransformerWithKey} must return the new value in
+ * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
+ * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+ * is {@code null}, no records are emitted.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+ * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+ * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+ * to emit a {@link KeyValue} pair.
+ * <pre>{@code
+ * new ValueTransformerWithKeySupplier() {
+ * ValueTransformerWithKey get() {
+ * return new ValueTransformerWithKey() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * // punctuate each second, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * NewValueType transform(K readOnlyKey, V value) {
+ * // can access this.state and use read-only key
+ * return new NewValueType(readOnlyKey); // or null
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code transformValues()}.
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ *
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+ * {@link ValueTransformerWithKey}
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+ final String... stateStoreNames);
+
+ /**
+ * Transform the value of each input record into a new value (with possibly a new type) of the output record.
+ * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+ * each input record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
+ * <p>
+ * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
+ * connect global state stores; read-only access to global state stores is available by default):
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * The {@link ValueTransformerWithKey} must return the new value in
+ * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
+ * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+ * is {@code null}, no records are emitted.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+ * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+ * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+ * to emit a {@link KeyValue} pair.
+ * <pre>{@code
+ * new ValueTransformerWithKeySupplier() {
+ * ValueTransformerWithKey get() {
+ * return new ValueTransformerWithKey() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * // punctuate each second, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * NewValueType transform(K readOnlyKey, V value) {
+ * // can access this.state and use read-only key
+ * return new NewValueType(readOnlyKey); // or null
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String) through()} should be performed before
+ * {@code transformValues()}.
+ * <p>
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ *
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+ * {@link ValueTransformerWithKey}
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <VR> the value type of the result stream
+ * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #mapValues(ValueMapper)
+ * @see #mapValues(ValueMapperWithKey)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+ final Named named,
+ final String... stateStoreNames);
+ /**
+ * Transform the value of each input record into zero or more new values (with possibly a new
+ * type) and emit for each new value a record with the same key of the input record and the value.
+ * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+ * record value and computes zero or more new values.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+ * the processing progress can be observed and additional periodic actions can be performed.
+ * <p>
+ * In order to assign a state store, the state store must be created and registered beforehand:
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
+ * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+ * transform()}.
+ * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
+ * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
+ * {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
+ * can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+ * emit a {@link KeyValue} pair.
+ * <pre>{@code
+ * new ValueTransformerSupplier() {
+ * ValueTransformer get() {
+ * return new ValueTransformer() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * // punctuate each second, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * Iterable<NewValueType> transform(V value) {
+ * // can access this.state
+ * List<NewValueType> result = new ArrayList<>();
+ * for (int i = 0; i < 3; i++) {
+ * result.add(new NewValueType(value));
+ * }
+ * return result; // values
* }
*
* void close() {
@@ -739,35 +1624,38 @@ public interface KStream<K, V> {
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
- * {@code transformValues()}.
+ * {@code flatTransformValues()}.
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
+ * flatTransform()})
*
- * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+ * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
* {@link ValueTransformer}
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
+ * different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
+ * @see #flatTransform(TransformerSupplier, String...)
*/
- <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
- final String... stateStoreNames);
+ <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+ final String... stateStoreNames);
/**
- * Transform the value of each input record into a new value (with possibly a new type) of the output record.
- * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
- * each input record value and computes a new value for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
- * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
- * can be observed and additional periodic actions can be performed.
+ * Transform the value of each input record into zero or more new values (with possibly a new
+ * type) and emit for each new value a record with the same key of the input record and the value.
+ * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
+ * record value and computes zero or more new values.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
+ * the processing progress can be observed and additional periodic actions can be performed.
* <p>
- * In order to assign a state store, the state store must be created and registered beforehand (it's not required to
- * connect global state stores; read-only access to global state stores is available by default):
+ * In order to assign a state store, the state store must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
@@ -777,24 +1665,25 @@ public interface KStream<K, V> {
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
- * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+ * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
* }</pre>
- * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
+ * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
- * The {@link ValueTransformerWithKey} must return the new value in
- * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
- * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
- * is {@code null}, no records are emitted.
+ * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
+ * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+ * transform()}.
+ * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
+ * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
- * to emit a {@link KeyValue} pair.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
+ * emit a {@link KeyValue} pair.
* <pre>{@code
- * new ValueTransformerWithKeySupplier() {
- * ValueTransformerWithKey get() {
- * return new ValueTransformerWithKey() {
+ * new ValueTransformerSupplier() {
+ * ValueTransformer get() {
+ * return new ValueTransformer() {
* private StateStore state;
*
* void init(ProcessorContext context) {
@@ -803,9 +1692,13 @@ public interface KStream<K, V> {
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
- * NewValueType transform(K readOnlyKey, V value) {
- * // can access this.state and use read-only key
- * return new NewValueType(readOnlyKey); // or null
+ * Iterable<NewValueType> transform(V value) {
+ * // can access this.state
+ * List<NewValueType> result = new ArrayList<>();
+ * for (int i = 0; i < 3; i++) {
+ * result.add(new NewValueType(value));
+ * }
+ * return result; // values
* }
*
* void close() {
@@ -817,34 +1710,38 @@ public interface KStream<K, V> {
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
- * {@code transformValues()}.
+ * {@code flatTransformValues()}.
* <p>
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the key.
+ * Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ * is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
+ * flatTransform()})
*
- * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
- * {@link ValueTransformerWithKey}
+ * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
+ * {@link ValueTransformer}
+ * @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
+ * different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #transform(TransformerSupplier, String...)
+ * @see #flatTransform(TransformerSupplier, String...)
*/
- <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
- final String... stateStoreNames);
+ <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+ final Named named,
+ final String... stateStoreNames);
/**
* Transform the value of each input record into zero or more new values (with possibly a new
* type) and emit for each new value a record with the same key of the input record and the value.
- * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
- * record value and computes zero or more new values.
+ * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
+ * each input record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
- * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
- * the processing progress can be observed and additional periodic actions can be performed.
+ * This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
+ * be observed and additional periodic actions can be performed.
* <p>
* In order to assign a state store, the state store must be created and registered beforehand:
* <pre>{@code
@@ -856,25 +1753,25 @@ public interface KStream<K, V> {
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
- * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+ * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
* }</pre>
- * Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
+ * Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
- * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
- * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
+ * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
+ * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
* transform()}.
- * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
- * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
+ * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
+ * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
- * emit a {@link KeyValue} pair.
+ * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
+ * to emit a {@link KeyValue} pair.
* <pre>{@code
- * new ValueTransformerSupplier() {
- * ValueTransformer get() {
- * return new ValueTransformer() {
+ * new ValueTransformerWithKeySupplier() {
+ * ValueTransformerWithKey get() {
+ * return new ValueTransformerWithKey() {
* private StateStore state;
*
* void init(ProcessorContext context) {
@@ -883,11 +1780,11 @@ public interface KStream<K, V> {
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
- * Iterable<NewValueType> transform(V value) {
- * // can access this.state
+ * Iterable<NewValueType> transform(K readOnlyKey, V value) {
+ * // can access this.state and use read-only key
* List<NewValueType> result = new ArrayList<>();
* for (int i = 0; i < 3; i++) {
- * result.add(new NewValueType(value));
+ * result.add(new NewValueType(readOnlyKey));
* }
* return result; // values
* }
@@ -903,13 +1800,14 @@ public interface KStream<K, V> {
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
* {@code flatTransformValues()}.
* <p>
- * Setting a new value preserves data co-location with respect to the key.
+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
+ * So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
* flatTransform()})
*
- * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
- * {@link ValueTransformer}
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
+ * {@link ValueTransformerWithKey}
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
@@ -919,7 +1817,7 @@ public interface KStream<K, V> {
* @see #transform(TransformerSupplier, String...)
* @see #flatTransform(TransformerSupplier, String...)
*/
- <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+ <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames);
/**
@@ -929,8 +1827,8 @@ public interface KStream<K, V> {
* each input record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()} the processing
- * progress can be observed and additional periodic actions can be performed.
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
+ * be observed and additional periodic actions can be performed.
* <p>
* In order to assign a state store, the state store must be created and registered beforehand:
* <pre>{@code
@@ -997,6 +1895,7 @@ public interface KStream<K, V> {
*
* @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}
+ * @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
@@ -1007,8 +1906,67 @@ public interface KStream<K, V> {
* @see #flatTransform(TransformerSupplier, String...)
*/
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+ final Named named,
final String... stateStoreNames);
+ /**
+ * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
+ * {@link ProcessorSupplier}).
+ * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
+ * can be observed and additional periodic actions can be performed.
+ * Note that this is a terminal operation that returns void.
+ * <p>
+ * In order to assign a state, the state must be created and registered beforehand (it's not required to connect
+ * global state stores; read-only access to global state stores is available by default):
+ * <pre>{@code
+ * // create store
+ * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+ * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
+ * Serdes.String(),
+ * Serdes.String());
+ * // register store
+ * builder.addStateStore(keyValueStoreBuilder);
+ *
+ * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
+ * }</pre>
+ * Within the {@link Processor}, the state is obtained via the
+ * {@link ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * <pre>{@code
+ * new ProcessorSupplier() {
+ * Processor get() {
+ * return new Processor() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myProcessorState");
+ * // punctuate each second, can access this.state
+ * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+ * }
+ *
+ * void process(K key, V value) {
+ * // can access this.state
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * Even if any upstream operation was key-changing, no auto-repartition is triggered.
+ * If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
+ *
+ * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+ * @param stateStoreNames the names of the state store used by the processor
+ * @see #foreach(ForeachAction)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+ final String... stateStoreNames);
/**
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
@@ -1062,11 +2020,13 @@ public interface KStream<K, V> {
* If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
*
* @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
+ * @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor
* @see #foreach(ForeachAction)
* @see #transform(TransformerSupplier, String...)
*/
void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+ final Named named,
final String... stateStoreNames);
/**
@@ -2083,6 +3043,40 @@ public interface KStream<K, V> {
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
/**
+ * Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
+ * The join is a primary key table lookup join with join attribute
+ * {@code keyValueMapper.map(stream.keyValue) == table.key}.
+ * "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
+ * This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
+ * state.
+ * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
+ * state and will not produce any result records.
+ * <p>
+ * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
+ * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as the key of this {@code KStream}.
+ * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
+ * operation and thus no output record will be added to the resulting {@code KStream}.
+ * If {@code keyValueMapper} returns {@code null} implying no match exists, no output record will be added to the
+ * resulting {@code KStream}.
+ *
+ * @param globalKTable the {@link GlobalKTable} to be joined with this stream
+ * @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
+ * to the key of the {@link GlobalKTable}
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <GK> the key type of {@link GlobalKTable}
+ * @param <GV> the value type of the {@link GlobalKTable}
+ * @param <RV> the value type of the resulting {@code KStream}
+ * @return a {@code KStream} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one output for each input {@code KStream} record
+ * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+ */
+ <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
+ final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
+ final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
+ final Named named);
+ /**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
* In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream
* will produce an output record (cf. below).
@@ -2118,4 +3112,43 @@ public interface KStream<K, V> {
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);
+
+ /**
+ * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
+ * In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream
+ * will produce an output record (cf. below).
+ * The join is a primary key table lookup join with join attribute
+ * {@code keyValueMapper.map(stream.keyValue) == table.key}.
+ * "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
+ * This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
+ * state.
+ * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
+ * state and will not produce any result records.
+ * <p>
+ * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
+ * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as this {@code KStream}.
+ * If a {@code KStream} input record key or value is {@code null} the record will not be included in the join
+ * operation and thus no output record will be added to the resulting {@code KStream}.
+ * If {@code keyValueMapper} returns {@code null} implying no match exists, a {@code null} value will be
+ * provided to {@link ValueJoiner}.
+ * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
+ * {@link ValueJoiner}.
+ *
+ * @param globalKTable the {@link GlobalKTable} to be joined with this stream
+ * @param keyValueMapper instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
+ * to the key of the {@link GlobalKTable}
+ * @param valueJoiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param named a {@link Named} config used to name the processor in the topology
+ * @param <GK> the key type of {@link GlobalKTable}
+ * @param <GV> the value type of the {@link GlobalKTable}
+ * @param <RV> the value type of the resulting {@code KStream}
+ * @return a {@code KStream} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one output for each input {@code KStream} record
+ * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+ */
+ <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
+ final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
+ final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner,
+ final Named named);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 67729ec..9d76033 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
@@ -128,47 +129,64 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) {
- Objects.requireNonNull(predicate, "predicate can't be null");
- final String name = builder.newProcessorName(FILTER_NAME);
-
+ return filter(predicate, NamedInternal.empty());
+ }
+ @Override
+ public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
- return new KStreamImpl<>(name,
- keySerde,
- valSerde,
- sourceNodes,
- repartitionRequired,
- filterProcessorNode,
- builder);
+ return new KStreamImpl<>(
+ name,
+ keySerde,
+ valSerde,
+ sourceNodes,
+ repartitionRequired,
+ filterProcessorNode,
+ builder);
}
@Override
public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
- Objects.requireNonNull(predicate, "predicate can't be null");
- final String name = builder.newProcessorName(FILTER_NAME);
+ return filterNot(predicate, NamedInternal.empty());
+ }
+ @Override
+ public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
- return new KStreamImpl<>(name,
- keySerde,
- valSerde,
- sourceNodes,
- repartitionRequired,
- filterNotProcessorNode,
- builder);
+ return new KStreamImpl<>(
+ name,
+ keySerde,
+ valSerde,
+ sourceNodes,
+ repartitionRequired,
+ filterNotProcessorNode,
+ builder);
}
@Override
public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
+ return selectKey(mapper, NamedInternal.empty());
+ }
+
+ @Override
+ public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper, final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(named, "named can't be null");
- final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper, NamedInternal.empty());
+ final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named));
selectKeyProcessorNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
@@ -177,7 +195,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new KStreamImpl<>(selectKeyProcessorNode.nodeName(), null, valSerde, sourceNodes, true, selectKeyProcessorNode, builder);
}
-
private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
final NamedInternal named) {
final String name = named.orElseGenerateWithPrefix(builder, KEY_SELECT_NAME);
@@ -190,9 +207,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
- Objects.requireNonNull(mapper, "mapper can't be null");
- final String name = builder.newProcessorName(MAP_NAME);
+ return map(mapper, NamedInternal.empty());
+ }
+ @Override
+ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final Named named) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -201,26 +223,36 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
// key and value serde cannot be preserved
- return new KStreamImpl<>(name,
- null,
- null,
- sourceNodes,
- true,
- mapProcessorNode,
- builder);
+ return new KStreamImpl<>(
+ name,
+ null,
+ null,
+ sourceNodes,
+ true,
+ mapProcessorNode,
+ builder);
}
-
@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper) {
return mapValues(withKey(mapper));
}
@Override
+ public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Named named) {
+ return mapValues(withKey(mapper), named);
+ }
+
+ @Override
public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
- Objects.requireNonNull(mapper, "mapper can't be null");
- final String name = builder.newProcessorName(MAPVALUES_NAME);
+ return mapValues(mapper, NamedInternal.empty());
+ }
+ @Override
+ public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, final Named named) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(mapper, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -228,13 +260,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
// value serde cannot be preserved
- return new KStreamImpl<>(name,
- keySerde,
- null,
- sourceNodes,
- repartitionRequired,
- mapValuesProcessorNode,
- builder);
+ return new KStreamImpl<>(
+ name,
+ keySerde,
+ null,
+ sourceNodes,
+ repartitionRequired,
+ mapValuesProcessorNode,
+ builder);
}
@Override
@@ -250,9 +283,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
- Objects.requireNonNull(mapper, "mapper can't be null");
- final String name = builder.newProcessorName(FLATMAP_NAME);
+ return flatMap(mapper, NamedInternal.empty());
+ }
+ @Override
+ public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper,
+ final Named named) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAP_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters);
flatMapNode.keyChangingOperation(true);
@@ -260,13 +299,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, flatMapNode);
// key and value serde cannot be preserved
- return new KStreamImpl<>(name,
- null,
- null,
- sourceNodes,
- true,
- flatMapNode,
- builder);
+ return new KStreamImpl<>(name, null, null, sourceNodes, true, flatMapNode, builder);
}
@Override
@@ -275,10 +308,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
+ public <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper,
+ final Named named) {
+ return flatMapValues(withKey(mapper), named);
+ }
+
+ @Override
public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
- Objects.requireNonNull(mapper, "mapper can't be null");
- final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
+ return flatMapValues(mapper, NamedInternal.empty());
+ }
+ @Override
+ public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper,
+ final Named named) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAPVALUES_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -292,6 +337,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates) {
+ return doBranch(NamedInternal.empty(), predicates);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super V>... predicates) {
+ Objects.requireNonNull(name, "name can't be null");
+ return doBranch(new NamedInternal(name), predicates);
+ }
+
+ @SuppressWarnings("unchecked")
+ private KStream<K, V>[] doBranch(final NamedInternal named,
+ final Predicate<? super K, ? super V>... predicates) {
if (predicates.length == 0) {
throw new IllegalArgumentException("you must provide at least one predicate");
}
@@ -299,11 +357,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(predicate, "predicates can't have null values");
}
- final String branchName = builder.newProcessorName(BRANCH_NAME);
+ final String branchName = named.orElseGenerateWithPrefix(builder, BRANCH_NAME);
final String[] childNames = new String[predicates.length];
for (int i = 0; i < predicates.length; i++) {
- childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
+ childNames[i] = named.suffixWithOrElseGet("-predicate-" + i, builder, BRANCHCHILD_NAME);
}
final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
@@ -326,13 +384,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public KStream<K, V> merge(final KStream<K, V> stream) {
Objects.requireNonNull(stream);
- return merge(builder, stream);
+ return merge(builder, stream, NamedInternal.empty());
+ }
+
+ @Override
+ public KStream<K, V> merge(final KStream<K, V> stream, final Named processorName) {
+ Objects.requireNonNull(stream);
+ return merge(builder, stream, new NamedInternal(processorName));
}
private KStream<K, V> merge(final InternalStreamsBuilder builder,
- final KStream<K, V> stream) {
+ final KStream<K, V> stream,
+ final NamedInternal processorName) {
final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
- final String name = builder.newProcessorName(MERGE_NAME);
+ final String name = processorName.orElseGenerateWithPrefix(builder, MERGE_NAME);
final Set<String> allSourceNodes = new HashSet<>();
final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
@@ -341,9 +406,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
-
final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name, processorParameters);
-
mergeNode.setMergeNode(true);
builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
@@ -353,12 +416,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public void foreach(final ForeachAction<? super K, ? super V> action) {
- Objects.requireNonNull(action, "action can't be null");
- final String name = builder.newProcessorName(FOREACH_NAME);
+ foreach(action, NamedInternal.empty());
+ }
+ @Override
+ public void foreach(final ForeachAction<? super K, ? super V> action, final Named named) {
+ Objects.requireNonNull(action, "action can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(
- new KStreamPeek<>(action, false),
- name
+ new KStreamPeek<>(action, false),
+ name
);
final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -367,12 +434,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
+ return peek(action, NamedInternal.empty());
+ }
+
+ @Override
+ public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action, final Named named) {
Objects.requireNonNull(action, "action can't be null");
- final String name = builder.newProcessorName(PEEK_NAME);
+ Objects.requireNonNull(named, "named can't be null");
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(
- new KStreamPeek<>(action, true),
- name
+ new KStreamPeek<>(action, true),
+ name
);
final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name, processorParameters);
@@ -459,56 +532,89 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, sinkNode);
}
- private <K1, V1> KStream<K1, V1> doFlatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
- final String... stateStoreNames) {
+ @Override
+ public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
- final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
- name,
- new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name),
- stateStoreNames
- );
-
- transformNode.keyChangingOperation(true);
- builder.addGraphNode(this.streamsGraphNode, transformNode);
-
- // cannot inherit key and value serde
- return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
+ return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), Named.as(name), stateStoreNames);
}
@Override
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
+ final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
- return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
+ return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), named, stateStoreNames);
+ }
+
+ @Override
+ public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
+ final String name = builder.newProcessorName(TRANSFORM_NAME);
+ return flatTransform(transformerSupplier, Named.as(name), stateStoreNames);
}
@Override
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+ final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
- return doFlatTransform(transformerSupplier, stateStoreNames);
+ Objects.requireNonNull(named, "named can't be null");
+
+ final String name = new NamedInternal(named).name();
+ final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
+ name,
+ new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name),
+ stateStoreNames
+ );
+
+ transformNode.keyChangingOperation(true);
+ builder.addGraphNode(streamsGraphNode, transformNode);
+
+ // cannot inherit key and value serde
+ return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
}
@Override
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+ return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
+ }
- return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
+ @Override
+ public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
+ final Named named,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier),
+ new NamedInternal(named), stateStoreNames);
}
@Override
public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+ return doTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
+ }
- return doTransformValues(valueTransformerSupplier, stateStoreNames);
+ @Override
+ public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
+ final Named named,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ return doTransformValues(valueTransformerSupplier, new NamedInternal(named), stateStoreNames);
}
private <VR> KStream<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier,
+ final NamedInternal named,
final String... stateStoreNames) {
- final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+ final String name = named.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
@@ -527,20 +633,39 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
+ return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
+ }
+
+ @Override
+ public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
+ final Named named,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+
+ return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), named, stateStoreNames);
+ }
+
+ @Override
+ public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
+
+ return doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
}
@Override
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
+ final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- return doFlatTransformValues(valueTransformerSupplier, stateStoreNames);
+ return doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
}
private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier,
+ final Named named,
final String... stateStoreNames) {
- final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
@@ -561,11 +686,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
final String name = builder.newProcessorName(PROCESSOR_NAME);
+ process(processorSupplier, Named.as(name), stateStoreNames);
+ }
+
+ @Override
+ public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+ final Named named,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
+ Objects.requireNonNull(named, "named cant' be null");
+ final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
- name,
- new ProcessorParameters<>(processorSupplier, name),
- stateStoreNames
+ name,
+ new ProcessorParameters<>(processorSupplier, name),
+ stateStoreNames
);
builder.addGraphNode(this.streamsGraphNode, processNode);
@@ -621,14 +756,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
- final String name = joinedInternal.name();
+ final NamedInternal name = new NamedInternal(joinedInternal.name());
if (joinThis.repartitionRequired) {
- final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name;
+ final String joinThisName = joinThis.name;
+ final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
}
if (joinOther.repartitionRequired) {
- final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name;
+ final String joinOtherName = joinOther.name;
+ final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
}
@@ -779,26 +916,45 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
- return globalTableJoin(globalTable, keyMapper, joiner, false);
+ return globalTableJoin(globalTable, keyMapper, joiner, false, NamedInternal.empty());
+ }
+
+ @Override
+ public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable,
+ final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+ final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
+ final Named named) {
+ return globalTableJoin(globalTable, keyMapper, joiner, false, named);
}
@Override
public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
- return globalTableJoin(globalTable, keyMapper, joiner, true);
+ return globalTableJoin(globalTable, keyMapper, joiner, true, NamedInternal.empty());
}
+ @Override
+ public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable,
+ final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
+ final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
+ final Named named) {
+ return globalTableJoin(globalTable, keyMapper, joiner, true, named);
+ }
+
+
private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper,
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner,
- final boolean leftJoin) {
+ final boolean leftJoin,
+ final Named named) {
Objects.requireNonNull(globalTable, "globalTable can't be null");
Objects.requireNonNull(keyMapper, "keyMapper can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
+ Objects.requireNonNull(named, "named can't be null");
final KTableValueGetterSupplier<KG, VG> valueGetterSupplier = ((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
- final String name = builder.newProcessorName(LEFTJOIN_NAME);
+ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, LEFTJOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
valueGetterSupplier,
@@ -828,7 +984,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
- final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+ final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
+ final NamedInternal renamed = new NamedInternal(joinedInternal.name());
+
+ final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) other).valueGetterSupplier(),
joiner,
@@ -945,12 +1104,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
final JoinWindows windows,
final Joined<K1, V1, V2> joined) {
- final String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
- final String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
- final String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME);
- final String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME);
- final String joinMergeName = builder.newProcessorName(MERGE_NAME);
+ final JoinedInternal<K1, V1, V2> joinedInternal = new JoinedInternal<>(joined);
+ final NamedInternal renamed = new NamedInternal(joinedInternal.name());
+
+ final String thisWindowStreamName = renamed.suffixWithOrElseGet(
+ "-this-windowed", builder, WINDOWED_NAME);
+ final String otherWindowStreamName = renamed.suffixWithOrElseGet(
+ "-other-windowed", builder, WINDOWED_NAME);
+
+ final String joinThisName = rightOuter ?
+ renamed.suffixWithOrElseGet("-outer-this-join", builder, OUTERTHIS_NAME)
+ : renamed.suffixWithOrElseGet("-this-join", builder, JOINTHIS_NAME);
+ final String joinOtherName = leftOuter ?
+ renamed.suffixWithOrElseGet("-outer-other-join", builder, OUTEROTHER_NAME)
+ : renamed.suffixWithOrElseGet("-other-join", builder, JOINOTHER_NAME);
+ final String joinMergeName = renamed.suffixWithOrElseGet(
+ "-merge", builder, MERGE_NAME);
final StreamsGraphNode thisStreamsGraphNode = ((AbstractStream) lhs).streamsGraphNode;
final StreamsGraphNode otherStreamsGraphNode = ((AbstractStream) other).streamsGraphNode;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
index d478e9b..532928a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.Named;
public class NamedInternal extends Named {
public static NamedInternal empty() {
- return new NamedInternal(null);
+ return new NamedInternal((String) null);
}
public static NamedInternal with(final String name) {
@@ -31,6 +31,15 @@ public class NamedInternal extends Named {
/**
* Creates a new {@link NamedInternal} instance.
*
+ * @param internal the internal name.
+ */
+ NamedInternal(final Named internal) {
+ super(internal);
+ }
+
+ /**
+ * Creates a new {@link NamedInternal} instance.
+ *
* @param internal the internal name.
*/
NamedInternal(final String internal) {
@@ -48,6 +57,14 @@ public class NamedInternal extends Named {
public NamedInternal withName(final String name) {
return new NamedInternal(name);
}
+
+ String suffixWithOrElseGet(final String suffix, final String other) {
+ if (name != null) {
+ return name + suffix;
+ } else {
+ return other;
+ }
+ }
String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
// We actually do not need to generate processor names for operation if a name is specified.
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 93d444b..49477b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -24,10 +24,16 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
@@ -39,9 +45,9 @@ import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Assert;
import org.junit.Test;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -59,6 +65,8 @@ public class StreamsBuilderTest {
private static final String STREAM_TOPIC = "stream-topic";
+ private static final String STREAM_OPERATION_NAME = "stream-operation";
+
private static final String STREAM_TOPIC_TWO = "stream-topic-two";
private static final String TABLE_TOPIC = "table-topic";
@@ -470,16 +478,243 @@ public class StreamsBuilderTest {
assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000002");
}
- private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
+ @Test
+ public void shouldUseSpecifiedNameForMapOperation() {
+ builder.stream(STREAM_TOPIC).map(KeyValue::pair, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForMapValuesOperation() {
+ builder.stream(STREAM_TOPIC).mapValues(v -> v, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForMapValuesWithKeyOperation() {
+ builder.stream(STREAM_TOPIC).mapValues((k, v) -> v, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForFilterOperation() {
+ builder.stream(STREAM_TOPIC).filter((k, v) -> true, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForForEachOperation() {
+ builder.stream(STREAM_TOPIC).foreach((k, v) -> { }, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForTransform() {
+ builder.stream(STREAM_TOPIC).transform(() -> null, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseSpecifiedNameForTransformValues() {
+ builder.stream(STREAM_TOPIC).transformValues(() -> (ValueTransformer) null, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void shouldUseSpecifiedNameForTransformValuesWithKey() {
+ builder.stream(STREAM_TOPIC).transformValues(() -> (ValueTransformerWithKey) null, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void shouldUseSpecifiedNameForBranchOperation() {
+ builder.stream(STREAM_TOPIC)
+ .branch(Named.as("branch-processor"), (k, v) -> true, (k, v) -> false);
+
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology,
+ "KSTREAM-SOURCE-0000000000",
+ "branch-processor",
+ "branch-processor-predicate-0",
+ "branch-processor-predicate-1");
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKTable() {
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KTable<String, String> streamTwo = builder.table("table-topic");
+ streamOne.join(streamTwo, (value1, value2) -> value1, Joined.as(STREAM_OPERATION_NAME));
+ builder.build();
+
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology,
+ "KSTREAM-SOURCE-0000000000",
+ "KSTREAM-SOURCE-0000000002",
+ "KTABLE-SOURCE-0000000003",
+ STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKTable() {
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KTable<String, String> streamTwo = builder.table(STREAM_TOPIC_TWO);
+ streamOne.leftJoin(streamTwo, (value1, value2) -> value1, Joined.as(STREAM_OPERATION_NAME));
+ builder.build();
+
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology,
+ "KSTREAM-SOURCE-0000000000",
+ "KSTREAM-SOURCE-0000000002",
+ "KTABLE-SOURCE-0000000003",
+ STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKStream() {
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
+
+ streamOne.leftJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), Joined.as(STREAM_OPERATION_NAME));
+ builder.build();
+
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForStateStore(topology.stateStores(),
+ STREAM_OPERATION_NAME + "-this-join-store", STREAM_OPERATION_NAME + "-outer-other-join-store"
+ );
+ assertSpecifiedNameForOperation(topology,
+ "KSTREAM-SOURCE-0000000000",
+ "KSTREAM-SOURCE-0000000001",
+ STREAM_OPERATION_NAME + "-this-windowed",
+ STREAM_OPERATION_NAME + "-other-windowed",
+ STREAM_OPERATION_NAME + "-this-join",
+ STREAM_OPERATION_NAME + "-outer-other-join",
+ STREAM_OPERATION_NAME + "-merge");
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKStream() {
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
+
+ streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), Joined.as(STREAM_OPERATION_NAME));
+ builder.build();
+
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForStateStore(topology.stateStores(),
+ STREAM_OPERATION_NAME + "-this-join-store",
+ STREAM_OPERATION_NAME + "-other-join-store"
+ );
+ assertSpecifiedNameForOperation(topology,
+ "KSTREAM-SOURCE-0000000000",
+ "KSTREAM-SOURCE-0000000001",
+ STREAM_OPERATION_NAME + "-this-windowed",
+ STREAM_OPERATION_NAME + "-other-windowed",
+ STREAM_OPERATION_NAME + "-this-join",
+ STREAM_OPERATION_NAME + "-other-join",
+ STREAM_OPERATION_NAME + "-merge");
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForOuterJoinOperationBetweenKStreamAndKStream() {
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
+
+ streamOne.outerJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of(Duration.ofHours(1)), Joined.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForStateStore(topology.stateStores(),
+ STREAM_OPERATION_NAME + "-outer-this-join-store",
+ STREAM_OPERATION_NAME + "-outer-other-join-store");
+ assertSpecifiedNameForOperation(topology,
+ "KSTREAM-SOURCE-0000000000",
+ "KSTREAM-SOURCE-0000000001",
+ STREAM_OPERATION_NAME + "-this-windowed",
+ STREAM_OPERATION_NAME + "-other-windowed",
+ STREAM_OPERATION_NAME + "-outer-this-join",
+ STREAM_OPERATION_NAME + "-outer-other-join",
+ STREAM_OPERATION_NAME + "-merge");
+
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForMergeOperation() {
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
+
+ final KStream<String, String> source1 = builder.stream(topic1);
+ final KStream<String, String> source2 = builder.stream(topic2);
+ source1.merge(source2, Named.as("merge-processor"));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "merge-processor");
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForProcessOperation() {
+ builder.stream(STREAM_TOPIC)
+ .process(() -> null, Named.as("test-processor"));
+
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-processor");
+ }
+
+ @Test
+ public void shouldUseSpecifiedNameForPrintOperation() {
+ builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor");
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void shouldUseSpecifiedNameForFlatTransformValueOperation() {
+ builder.stream(STREAM_TOPIC).flatTransformValues(() -> (ValueTransformer) null, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() {
+ builder.stream(STREAM_TOPIC).flatTransformValues(() -> (ValueTransformerWithKey) null, Named.as(STREAM_OPERATION_NAME));
+ builder.build();
+ final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+ assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ }
+
+ private static void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
final List<ProcessorNode> processors = topology.processors();
- Assert.assertEquals("Invalid number of expected processors", expected.length, processors.size());
+ assertEquals("Invalid number of expected processors", expected.length, processors.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processors.get(i).name());
}
}
- private void assertSpecifiedNameForStateStore(final List<StateStore> stores, final String... expected) {
- Assert.assertEquals("Invalid number of expected state stores", expected.length, stores.size());
+ private static void assertSpecifiedNameForStateStore(final List<StateStore> stores, final String... expected) {
+ assertEquals("Invalid number of expected state stores", expected.length, stores.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], stores.get(i).name());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 3c7e8c0..e621ffc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -550,44 +550,44 @@ public class RepartitionTopicNamingTest {
" --> KTABLE-TOSTREAM-0000000011\n" +
" <-- KSTREAM-SOURCE-0000000041\n" +
" Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
- " --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+ " --> joined-stream-other-windowed, KSTREAM-SINK-0000000012\n" +
" <-- KSTREAM-AGGREGATE-0000000007\n" +
" Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
" --> KSTREAM-PEEK-0000000021\n" +
" <-- KSTREAM-SOURCE-0000000041\n" +
" Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" +
- " --> KSTREAM-WINDOWED-0000000033\n" +
+ " --> joined-stream-this-windowed\n" +
" <-- KSTREAM-SOURCE-0000000041\n" +
" Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" +
" --> KSTREAM-REDUCE-0000000023\n" +
" <-- KSTREAM-FILTER-0000000020\n" +
- " Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
- " --> KSTREAM-JOINTHIS-0000000035\n" +
- " <-- KSTREAM-FILTER-0000000029\n" +
- " Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
- " --> KSTREAM-JOINOTHER-0000000036\n" +
+ " Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n" +
+ " --> joined-stream-other-join\n" +
" <-- KTABLE-TOSTREAM-0000000011\n" +
+ " Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n" +
+ " --> joined-stream-this-join\n" +
+ " <-- KSTREAM-FILTER-0000000029\n" +
" Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" +
" --> KTABLE-TOSTREAM-0000000018\n" +
" <-- KSTREAM-SOURCE-0000000041\n" +
- " Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
- " --> KSTREAM-MERGE-0000000037\n" +
- " <-- KSTREAM-WINDOWED-0000000034\n" +
- " Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
- " --> KSTREAM-MERGE-0000000037\n" +
- " <-- KSTREAM-WINDOWED-0000000033\n" +
" Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" +
" --> KTABLE-TOSTREAM-0000000027\n" +
" <-- KSTREAM-PEEK-0000000021\n" +
- " Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
- " --> KSTREAM-SINK-0000000038\n" +
- " <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+ " Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n" +
+ " --> joined-stream-merge\n" +
+ " <-- joined-stream-other-windowed\n" +
+ " Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n" +
+ " --> joined-stream-merge\n" +
+ " <-- joined-stream-this-windowed\n" +
" Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" +
" --> KSTREAM-SINK-0000000019\n" +
" <-- KSTREAM-AGGREGATE-0000000014\n" +
" Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" +
" --> KSTREAM-SINK-0000000028\n" +
" <-- KSTREAM-REDUCE-0000000023\n" +
+ " Processor: joined-stream-merge (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000038\n" +
+ " <-- joined-stream-this-join, joined-stream-other-join\n" +
" Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
" <-- KTABLE-TOSTREAM-0000000011\n" +
" Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" +
@@ -595,7 +595,7 @@ public class RepartitionTopicNamingTest {
" Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" +
" <-- KTABLE-TOSTREAM-0000000027\n" +
" Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
- " <-- KSTREAM-MERGE-0000000037\n\n";
+ " <-- joined-stream-merge\n\n";
private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" +
@@ -651,29 +651,29 @@ public class RepartitionTopicNamingTest {
" --> KTABLE-TOSTREAM-0000000011\n" +
" <-- KSTREAM-SOURCE-0000000010\n" +
" Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
- " --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+ " --> KSTREAM-SINK-0000000012, joined-stream-other-windowed\n" +
" <-- KSTREAM-AGGREGATE-0000000007\n" +
" Source: KSTREAM-SOURCE-0000000032 (topics: [joined-stream-left-repartition])\n" +
- " --> KSTREAM-WINDOWED-0000000033\n" +
- " Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
- " --> KSTREAM-JOINTHIS-0000000035\n" +
- " <-- KSTREAM-SOURCE-0000000032\n" +
- " Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
- " --> KSTREAM-JOINOTHER-0000000036\n" +
+ " --> joined-stream-this-windowed\n" +
+ " Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n" +
+ " --> joined-stream-other-join\n" +
" <-- KTABLE-TOSTREAM-0000000011\n" +
- " Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
- " --> KSTREAM-MERGE-0000000037\n" +
- " <-- KSTREAM-WINDOWED-0000000034\n" +
- " Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
- " --> KSTREAM-MERGE-0000000037\n" +
- " <-- KSTREAM-WINDOWED-0000000033\n" +
- " Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
+ " Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n" +
+ " --> joined-stream-this-join\n" +
+ " <-- KSTREAM-SOURCE-0000000032\n" +
+ " Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n" +
+ " --> joined-stream-merge\n" +
+ " <-- joined-stream-other-windowed\n" +
+ " Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n" +
+ " --> joined-stream-merge\n" +
+ " <-- joined-stream-this-windowed\n" +
+ " Processor: joined-stream-merge (stores: [])\n" +
" --> KSTREAM-SINK-0000000038\n" +
- " <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+ " <-- joined-stream-this-join, joined-stream-other-join\n" +
" Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
" <-- KTABLE-TOSTREAM-0000000011\n" +
" Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
- " <-- KSTREAM-MERGE-0000000037\n" +
+ " <-- joined-stream-merge\n" +
"\n" +
" Sub-topology: 2\n" +
" Source: KSTREAM-SOURCE-0000000017 (topics: [aggregate-stream-repartition])\n" +