You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/08 19:08:02 UTC
[1/2] kafka git commit: MINOR: Update JavaDoc of KStream interface
Repository: kafka
Updated Branches:
refs/heads/trunk 600859e77 -> 1949a76bc
http://git-wip-us.apache.org/repos/asf/kafka/blob/1949a76b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 4483e9f..eb46311 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -14,641 +14,1705 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
/**
- * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs.
+ * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs,
+ * i.e., each record is an independent entity/event in the real world.
+ * For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
+ * in the stream.
* <p>
* A {@link KStream} is either defined from one or multiple Kafka topics that are consumed message by message or
- * the result of a {@link KStream} transformation. A {@link KTable} can also be converted into a {@link KStream}.
+ * the result of a {@link KStream} transformation.
+ * A {@link KTable} can also be converted into a {@link KStream}.
* <p>
* A {@link KStream} can be transformed record by record, joined with another {@link KStream} or {@link KTable}, or
* can be aggregated into a {@link KTable}.
+ * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f.
+ * {@link org.apache.kafka.streams.processor.TopologyBuilder TopologyBuilder}) via
+ * {@link #process(ProcessorSupplier, String...) process(...)},
+ * {@link #transform(TransformerSupplier, String...) transform(...)}, and
+ * {@link #transformValues(ValueTransformerSupplier, String...) transformValues(...)}.
*
* @param <K> Type of keys
* @param <V> Type of values
- *
* @see KTable
*/
+@SuppressWarnings("unused")
@InterfaceStability.Unstable
public interface KStream<K, V> {
/**
- * Create a new instance of {@link KStream} that consists of all elements of this stream which satisfy a predicate.
- *
- * @param predicate the instance of {@link Predicate}
+ * Create a new {@link KStream} that consists of all records of this stream which satisfy a predicate.
+ * All records that do not satisfy the predicate are dropped. This is a stateless record-by-record operation.
*
+ * @param predicate a filter {@link Predicate} that is applied to each record
* @return a {@link KStream} that contains only those records that satisfy the given predicate
+ * @see #filterNot(Predicate)
*/
- KStream<K, V> filter(Predicate<K, V> predicate);
+ KStream<K, V> filter(final Predicate<K, V> predicate);
/**
- * Create a new instance of {@link KStream} that consists all elements of this stream which do not satisfy a predicate.
+ * Create a new {@link KStream} that consists all records of this stream which do <em>not</em> satisfy a predicate.
+ * All records that <em>do</em> satisfy the predicate are dropped. This is a stateless record-by-record operation.
*
- * @param predicate the instance of {@link Predicate}
- *
- * @return a {@link KStream} that contains only those records that do not satisfy the given predicate
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @return a {@link KStream} that contains only those records that do <em>not</em> satisfy the given predicate
+ * @see #filter(Predicate)
*/
- KStream<K, V> filterNot(Predicate<K, V> predicate);
-
+ KStream<K, V> filterNot(final Predicate<K, V> predicate);
/**
- * Create a new key from the current key and value.
- *
- * @param mapper the instance of {@link KeyValueMapper}
- * @param <K1> the new key type on the stream
- *
- * @return a {@link KStream} that contains records with different key type and same value type
+ * Set a new key (with possibly new type) for each input record.
+ * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} by
+ * extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
+ * length of the value string.
+ * <pre>{@code
+ * KStream<Byte[], String> keyLessStream = builder.from("key-less-topic");
+ * KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
+ * Integer apply(Byte[] key, String value) {
+ * return value.length();
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
+ * join) is applied to the result {@link KStream}.
+ *
+ * @param mapper a {@link KeyValueMapper} that computes a new key for each record
+ * @param <KR> the new key type of the result stream
+ * @return a {@link KStream} that contains records with new key (possibly of different type) and unmodified value
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapper)
*/
- <K1> KStream<K1, V> selectKey(KeyValueMapper<K, V, K1> mapper);
+ <KR> KStream<KR, V> selectKey(final KeyValueMapper<K, V, KR> mapper);
/**
- * Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream.
- *
- * @param mapper the instance of {@link KeyValueMapper}
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- *
- * @return a {@link KStream} that contains records with new key and value type
+ * Transform each record of the input stream into a new record in the output stream
+ * (both key and value type can be altered arbitrarily).
+ * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.from("topic");
+ * KStream<Integer, String> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
+ * KeyValue<String, Integer> apply(String key, String value) {
+ * return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and the return value must not be {@code null}.
+ * <p>
+ * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
+ * join) is applied to the result {@link KStream}. (cf. {@link #mapValues(ValueMapper)})
+ *
+ * @param mapper a {@link KeyValueMapper} that computes a new output record
+ * @param <KR> the key type of the result stream
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains records with new key and value (possibly both of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapper)
*/
- <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
+ <KR, VR> KStream<KR, VR> map(final KeyValueMapper<K, V, KeyValue<KR, VR>> mapper);
/**
- * Create a new instance of {@link KStream} by transforming the value of each element in this stream into a new value in the new stream.
- *
- * @param mapper the instance of {@link ValueMapper}
- * @param <V1> the value type of the new stream
- *
- * @return a {@link KStream} that contains records with unmodified keys and new values of different type
+ * Transform the value of each input record into a new value (with possible new type) of the output record.
+ * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}).
+ * <p>
+ * The example below counts the number of token of the value string.
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.from("topic");
+ * KStream<String, Integer> outputStream = inputStream.map(new ValueMapper<String, Integer> {
+ * Integer apply(String value) {
+ * return value.split(" ").length;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * Setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@link KStream}. (cf. {@link #map(KeyValueMapper)})
+ *
+ * @param mapper a {@link ValueMapper} that computes a new output value
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #flatMapValues(ValueMapper)
+ * @see #transformValues(ValueTransformerSupplier, String...)
*/
- <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
+ <VR> KStream<K, VR> mapValues(final ValueMapper<V, VR> mapper);
/**
- * Print the elements of this stream to {@code System.out}. This function
- * will use the generated name of the parent processor node to label the key/value pairs
- * printed out to the console.
- *
- * Implementors will need to override toString for keys and values that are not of
- * type String, Integer etc to get meaningful information.
+ * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+ * can be altered arbitrarily).
+ * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
+ * This is a stateless record-by-record operation (cf. {@link #transform(TransformerSupplier, String...)}).
+ * <p>
+ * The example below splits input records {@code <null:String>} containing sentences as values into their words
+ * and emit a record {@code <word:1>} for each word.
+ * <pre>{@code
+ * KStream<byte[], String> inputStream = builder.from("topic");
+ * KStream<Integer, String> outputStream = inputStream.flatMap(new KeyValueMapper<byte[], String, Iterable<KeyValue<String, Integer>>> {
+ * Iterable<KeyValue<String, Integer>> apply(byte[] key, String value) {
+ * String[] tokens = value.split(" ");
+ * List<KeyValue<String, Integer>> result = new ArrayList<>(tokens.length);
+ *
+ * for(String token : tokens) {
+ * result.add(new KeyValue<>(token, 1));
+ * }
+ *
+ * return result;
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+ * and the return value must not be {@code null}.
+ * <p>
+ * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation
+ * or join) is applied to the result {@link KStream}. (cf. {@link #flatMapValues(ValueMapper)})
+ *
+ * @param mapper a {@link KeyValueMapper} that computes the new output records
+ * @param <KR> the key type of the result stream
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains more or less records with new key and value (possibly of different type)
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
+ * @see #flatMapValues(ValueMapper)
+ * @see #transform(TransformerSupplier, String...)
*/
- void print();
+ <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<K, V, Iterable<KeyValue<KR, VR>>> mapper);
/**
- * Print the elements of this stream to {@code System.out}. This function
- * will use the given name to label the key/value printed out to the console.
- *
- * @param streamName the name used to label the key/value pairs printed out to the console
- *
- * Implementors will need to override toString for keys and values that are not of
- * type String, Integer etc to get meaningful information.
+ * Create a new instance of {@link KStream} by transforming the value of each element in this stream into zero or
+ * more values with the same key in the new stream.
+ * Transform the value of each input record into zero or more records with the same (unmodified) key in the output
+ * stream (value type can be altered arbitrarily).
+ * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
+ * This is a stateless record-by-record operation.
+ * <p>
+ * The example below splits input records {@code <null:String>} containing sentences as values into their words.
+ * <pre>{@code
+ * KStream<byte[], String> inputStream = builder.from("topic");
+ * KStream<Integer, String> outputStream = inputStream.flatMap(new ValueMapper<String, Iterable<String>> {
+ * Iterable<String> apply(String value) {
+ * return Arrays.asList(value.split(" "));
+ * }
+ * });
+ * }</pre>
+ * <p>
+ * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
+ * and the return value must not be {@code null}.
+ * <p>
+ * Splitting a record into multiple records with the same key preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@link KStream}. (cf. {@link #flatMap(KeyValueMapper)})
+ *
+ * @param processor a {@link ValueMapper} the computes the new output values
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains more or less records with unmodified keys and new values of different type
+ * @see #selectKey(KeyValueMapper)
+ * @see #map(KeyValueMapper)
+ * @see #flatMap(KeyValueMapper)
+ * @see #mapValues(ValueMapper)
*/
- void print(String streamName);
-
+ <VR> KStream<K, VR> flatMapValues(final ValueMapper<V, Iterable<VR>> processor);
/**
- * Print the elements of this stream to System.out. This function
- * will use the generated name of the parent processor node to label the key/value pairs
- * printed out to the console.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- *
- * Implementors will need to override toString for keys and values that are not of
- * type String, Integer etc to get meaningful information.
+ * Print the elements of this stream to {@code System.out}.
+ * This function will use the generated name of the parent processor node to label the key/value pairs printed out
+ * to the console.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
*/
- void print(Serde<K> keySerde, Serde<V> valSerde);
+ void print();
/**
- * Print the elements of this stream to System.out
+ * Print the elements of this stream to {@code System.out}.
+ * This function will use the given name to label the key/value pairs printed out to the console.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
* @param streamName the name used to label the key/value pairs printed out to the console
- *
- * Implementors will need to override {@code toString} for keys and values that are not of
- * type {@link String}, {@link Integer} etc. to get meaningful information.
*/
- void print(Serde<K> keySerde, Serde<V> valSerde, String streamName);
-
+ void print(final String streamName);
/**
- * Write the elements of this stream to a file at the given path.
- *
- * @param filePath name of file to write to
- *
- * Implementors will need to override {@code toString} for keys and values that are not of
- * type {@link String}, {@link Integer} etc. to get meaningful information.
+ * Print the elements of this stream to {@code System.out}.
+ * This function will use the generated name of the parent processor node to label the key/value pairs printed out
+ * to the console.
+ * <p>
+ * The provided serde will be use to deserialize the key or value in case the type is {@code byte[]} before calling
+ * {@code toString()} on the deserialized object.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
+ *
+ * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+ * @param valSerde value serde used to deserialize value if type is {@code byte[]},
*/
- void writeAsText(String filePath);
-
+ void print(final Serde<K> keySerde,
+ final Serde<V> valSerde);
/**
- * Write the elements of this stream to a file at the given path.
- *
- * @param filePath name of file to write to
+ * Print the elements of this stream to {@code System.out}.
+ * <p>
+ * The provided serde will be use to deserialize the key or value in case the type is {@code byte[]} before calling
+ * {@code toString()} on the deserialized object.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
+ *
+ * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+ * @param valSerde value serde used to deserialize value if type is {@code byte[]},
* @param streamName the name used to label the key/value pairs printed out to the console
- *
- * Implementors will need to override {@code toString} for keys and values that are not of
- * type {@link String}, {@link Integer} etc. to get meaningful information.
*/
- void writeAsText(String filePath, String streamName);
+ void print(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String streamName);
/**
- * @param filePath name of file to write to
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
+ * Write the elements of this stream to a file at the given path.
+ * This function will use the generated name of the parent processor node to label the key/value pairs printed out
+ * to the file.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
*
- * Implementors will need to override {@code toString} for keys and values that are not of
- * type {@link String}, {@link Integer} etc. to get meaningful information.
+ * @param filePath name of the file to write to
*/
-
- void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+ void writeAsText(final String filePath);
/**
- * @param filePath name of file to write to
- * @param streamName the name used to label the key/value pairs printed out to the console
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
+ * Write the elements of this stream to a file at the given path.
+ * This function will use the given name to label the key/value printed out to the file.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
*
- * Implementors will need to override {@code toString} for keys and values that are not of
- * type {@link String}, {@link Integer} etc. to get meaningful information.
+ * @param filePath name of the file to write to
+ * @param streamName the name used to label the key/value pairs written to the file
*/
-
- void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde);
+ void writeAsText(final String filePath,
+ final String streamName);
/**
- * Create a new instance of {@link KStream} by transforming each element in this stream into zero or more elements in the new stream.
- *
- * @param mapper the instance of {@link KeyValueMapper}
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- *
- * @return a {@link KStream} that contains more or less records with new key and value type
+ * Write the elements of this stream to a file at the given path.
+ * This function will use the generated name of the parent processor node to label the key/value pairs printed out
+ * to the file.
+ * <p>
+ * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+ * {@code toString()} on the deserialized object.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
+ *
+ * @param filePath name of the file to write to
+ * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+ * @param valSerde value serde used to deserialize value if type is {@code byte[]},
*/
- <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
+ void writeAsText(final String filePath,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde);
/**
- * Create a new instance of {@link KStream} by transforming the value of each element in this stream into zero or more values with the same key in the new stream.
- *
- * @param processor the instance of {@link ValueMapper}
- * @param <V1> the value type of the new stream
- *
- * @return a {@link KStream} that contains more or less records with unmodified keys and new values of different type
+ * Write the elements of this stream to a file at the given path.
+ * This function will use the given name to label the key/value printed out to the file.
+ * <p>
+ * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]}
+ * before calling {@code toString()} on the deserialized object.
+ * <p>
+ * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+ * {@link Integer} etc. to get meaningful information.
+ *
+ * @param filePath name of the file to write to
+ * @param streamName the name used to label the key/value pairs written to the file
+ * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+ * @param valSerde value serde used deserialize value if type is {@code byte[]},
*/
- <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
+ void writeAsText(final String filePath,
+ final String streamName,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde);
/**
- * Creates an array of {@link KStream} from this stream by branching the elements in the original stream based on the supplied predicates.
- * Each element is evaluated against the supplied predicates, and predicates are evaluated in order. Each stream in the result array
- * corresponds position-wise (index) to the predicate in the supplied predicates. The branching happens on first-match: An element
- * in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and
- * assigned to this stream only. An element will be dropped if none of the predicates evaluate to true.
- *
- * @param predicates the ordered list of {@link Predicate} instances
+ * Perform an action on each record of {@link KStream}.
+ * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}).
+ * Note that this is a terminal operation that returns void.
*
- * @return multiple distinct substreams of this {@link KStream}
+ * @param action an action to perform on each record
+ * @see #process(ProcessorSupplier, String...)
*/
- KStream<K, V>[] branch(Predicate<K, V>... predicates);
+ void foreach(final ForeachAction<K, V> action);
/**
- * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
- * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
- * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
- *
- * @param topic the topic name
- *
- * @return a {@link KStream} that contains the exact same records as this {@link KStream}
+ * Creates an array of {@link KStream} from this stream by branching the elements in the original stream based on
+ * the supplied predicates.
+ * Each element is evaluated against the supplied predicates, and predicates are evaluated in order.
+ * Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates.
+ * The branching happens on first-match: An element in the original stream is assigned to the corresponding result
+ * stream for the first predicate that evaluates to true, and is assigned to this stream only.
+ * An element will be dropped if none of the predicates evaluate to true.
+ * This is a stateless record-by-record operation.
+ *
+ * @param predicates the ordered list of {@link Predicate} instances
+ * @return multiple distinct substreams of this {@link KStream}
*/
- KStream<K, V> through(String topic);
+ @SuppressWarnings("unchecked")
+ KStream<K, V>[] branch(final Predicate<K, V>... predicates);
/**
- * Perform an action on each element of {@link KStream}.
- * Note that this is a terminal operation that returns void.
- *
- * @param action an action to perform on each element
+ * Materialize this stream to a topic and creates a new instance of {@link KStream} from the topic using default
+ * serializers and deserializers and producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner}.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+ * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...) KStreamBuilder#stream(someTopicName)}.
+ *
+ * @param topic the topic name
+ * @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
- void foreach(ForeachAction<K, V> action);
+ KStream<K, V> through(final String topic);
/**
- * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
- * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
- * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
- *
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
- * @param topic the topic name
- *
+ * Materialize this stream to a topic and creates a new instance of {@link KStream} from the topic using default
+ * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
+ * records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(StreamPartitioner, someTopicName)} and
+ * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...) KStreamBuilder#stream(someTopicName)}.
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner}
+ * will be used
+ * @param topic the topic name
* @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
- KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+ KStream<K, V> through(final StreamPartitioner<K, V> partitioner,
+ final String topic);
/**
- * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic.
- * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
- * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
- * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
- * This is equivalent to calling {@link #to(Serde, Serde, String)} and
- * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
+ * Materialize this stream to a topic, and creates a new instance of {@link KStream} from the topic.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer
+ * WindowedSerializer} for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
+ * WindowedStreamPartitioner} is used—otherwise producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner} is used.
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valSerde, someTopicName)} and
+ * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)
+ * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
*
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name
* @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
- KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
+ KStream<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic);
/**
- * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+ * Materialize this stream to a topic and creates a new instance of {@link KStream} from the topic
* using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
- * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
- * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
- * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
- * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
- * @param topic the topic name
- *
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) #to(keySerde, valSerde,
+ * StreamPartitioner, someTopicName)} and
+ * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)
+ * KStreamBuilder#stream(keySerde, valSerde, someTopicName)}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer WindowedSerializer} for
+ * the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
+ * WindowedStreamPartitioner} will be used—otherwise
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner} will
+ * be used
+ * @param topic the topic name
* @return a {@link KStream} that contains the exact same records as this {@link KStream}
*/
- KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+ KStream<K, V> through(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<K, V> partitioner,
+ final String topic);
/**
- * Materialize this stream to a topic using default serializers specified in the config
- * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+ * Materialize this stream to a topic using default serializers specified in the config and producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner}.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
*
- * @param topic the topic name
+ * @param topic the topic name
*/
- void to(String topic);
+ void to(final String topic);
/**
* Materialize this stream to a topic using default serializers specified in the config and a customizable
* {@link StreamPartitioner} to determine the distribution of records to partitions.
- *
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
- * @param topic the topic name
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner} will
+ * be used
+ * @param topic the topic name
*/
- void to(StreamPartitioner<K, V> partitioner, String topic);
+ void to(final StreamPartitioner<K, V> partitioner,
+ final String topic);
/**
* Materialize this stream to a topic. If {@code keySerde} provides a
- * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
- * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
- * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner WindowedStreamPartitioner} is
+ * used—otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner
+ * DefaultPartitioner} is used.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param topic the topic name
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param topic the topic name
*/
- void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
+ void to(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic);
/**
- * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param partitioner the function used to determine how records are distributed among partitions of the topic,
- * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
- * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
- * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
- * @param topic the topic name
+ * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution
+ * of records to partitions.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer WindowedSerializer} for
+ * the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner
+ * WindowedStreamPartitioner} will be used—otherwise
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner DefaultPartitioner} will
+ * be used
+ * @param topic the topic name
*/
- void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+ void to(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final StreamPartitioner<K, V> partitioner,
+ final String topic);
/**
- * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.Transformer} to all elements in this stream, one element at a time.
- *
- * @param transformerSupplier the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}
- * @param stateStoreNames the names of the state store used by the processor
- *
- * @return a new {@link KStream} with transformed key and value types
+ * Transform each record of the input stream into zero or more records in the output stream (both key and value type
+ * can be altered arbitrarily).
+ * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
+ * computes zero or more output records.
+ * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K'':V''>, ...}.
+ * This is a stateful record-by-record operation (cf. {@link #flatMap(KeyValueMapper)}).
+ * Furthermore, via {@link Transformer#punctuate(long)} the processing progress can be observed and additional
+ * periodic actions can be performed.
+ * <p>
+ * In order to assign a state, the state must be created and registered beforehand:
+ * <pre>{@code
+ * // create store
+ * StateStore myStore = Stores.create("myTransformState")
+ * .withKeys(...)
+ * .withValues(...)
+ * .persistent() // optional
+ * .build()
+ * .get();
+ *
+ * // register store
+ * builder.addStore(myStore);
+ *
+ * KStream outputStream = inputStream.transform(new TransformerSupplier() { ... }, "myTransformState");
+ * }</pre>
+ * <p>
+ * Within the {@link Transformer}, the state is obtained via the
+ * {@link org.apache.kafka.streams.processor.ProcessorContext ProcessorContext}.
+ * To trigger periodic actions via {@link Transformer#punctuate(long) punctuate()}, a schedule must be registered.
+ * The {@link Transformer} must return a {@link KeyValue} type in {@link Transformer#transform(Object, Object)
+ * transform()} and {@link Transformer#punctuate(long) punctuate()}.
+ * <pre>{@code
+ * new TransformerSupplier() {
+ * Transformer get() {
+ * return new Transformer() {
+ * private ProcessorContext context;
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.context = context;
+ * this.state = context.getStateStore("myTransformState");
+ * context.schedule(1000); // call #punctuate() each 1000ms
+ * }
+ *
+ * KeyValue transform(K key, V value) {
+ * // can access this.state
+ * // can emit as many new KeyValue pairs as required via this.context#forward()
+ * return new KeyValue(key, value); // can emit a single value via return -- can also be null
+ * }
+ *
+ * KeyValue punctuate(long timestamp) {
+ * // can access this.state
+ * // can emit as many new KeyValue pairs as required via this.context#forward()
+ * return null; // don't return result -- can also be "new KeyValue()"
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * // can emit as many new KeyValue pairs as required via this.context#forward()
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * <p>
+ * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
+ * or join) is applied to the result {@link KStream}.
+ * (cf. {@link #transformValues(ValueTransformerSupplier, String...)})
+ *
+ * @param transformerSupplier a instance of {@link TransformerSupplier} that generates a {@link Transformer}
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <K1> the key type of the new stream
+ * @param <V1> the value type of the new stream
+ * @return a {@link KStream} that contains more or less records with new key and value (possibly of different type)
+ * @see #flatMap(KeyValueMapper)
+ * @see #transformValues(ValueTransformerSupplier, String...)
+ * @see #process(ProcessorSupplier, String...)
*/
- <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
+ <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier,
+ final String... stateStoreNames);
/**
- * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.ValueTransformer} to all values in this stream, one element at a time.
- *
- * @param valueTransformerSupplier the instance of {@link ValueTransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.ValueTransformer}
- * @param stateStoreNames the names of the state store used by the processor
- *
- * @return a {@link KStream} that contains records with unmodified keys and transformed values with type {@code R}
+ * Transform the value of each input record into a new value (with possible new type) of the output record.
+ * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applies to each input
+ * record value and computes a new value for it.
+ * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+ * This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
+ * Furthermore, via {@link ValueTransformer#punctuate(long)} the processing progress can be observed and additional
+ * periodic actions get be performed.
+ * <p>
+ * In order to assign a state, the state must be created and registered beforehand:
+ * <pre>{@code
+ * // create store
+ * StateStore myStore = Stores.create("myValueTransformState")
+ * .withKeys(...)
+ * .withValues(...)
+ * .persistent() // optional
+ * .build()
+ * .get();
+ *
+ * // register store
+ * builder.addStore(myStore);
+ *
+ * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
+ * }</pre>
+ * <p>
+ * Within the {@link ValueTransformer}, the state is obtained via the
+ * {@link org.apache.kafka.streams.processor.ProcessorContext ProcessorContext}.
+ * To trigger periodic actions via {@link ValueTransformer#punctuate(long) punctuate()}, a schedule must be
+ * registered.
+ * In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
+ * pairs should be emitted via {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
+ * ProcessorContext.forward()}.
+ * <pre>{@code
+ * new ValueTransformerSupplier() {
+ * ValueTransformer get() {
+ * return new ValueTransformer() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myValueTransformState");
+ * context.schedule(1000); // call #punctuate() each 1000ms
+ * }
+ *
+ * NewValueType transform(V value) {
+ * // can access this.state
+ * return new NewValueType(); // or null
+ * }
+ *
+ * NewValueType punctuate(long timestamp) {
+ * // can access this.state
+ * return null; // don't return result -- can also be "new NewValueType()" (current key will be used to build KeyValue pair)
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ * <p>
+ * Setting a new value preserves data co-location with respect to the key.
+ * Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
+ * is applied to the result {@link KStream}. (cf. {@link #transform(TransformerSupplier, String...)})
+ *
+ * @param valueTransformerSupplier a instance of {@link ValueTransformerSupplier} that generates a
+ * {@link ValueTransformer}
+ * @param stateStoreNames the names of the state stores used by the processor
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains records with unmodified key and new values (possibly of different type)
+ * @see #mapValues(ValueMapper)
+ * @see #transform(TransformerSupplier, String...)
+ */
+ <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<V, VR> valueTransformerSupplier,
+ final String... stateStoreNames);
+
+ /**
+ * Process all elements in this stream, one element at a time, by applying a
+ * {@link org.apache.kafka.streams.processor.Processor Processor} (provided by the given {@link ProcessorSupplier}).
+ * This is a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
+ * Furthermore, via {@link org.apache.kafka.streams.processor.Processor#punctuate(long) Processor.punctuate(long)}
+ * the processing progress can be observed and additional periodic actions get be performed.
+ * Note that this is a terminal operation that returns void.
+ * <p>
+ * In order to assign a state, the state must be created and registered beforehand:
+ * <pre>{@code
+ * // create store
+ * StateStore myStore = Stores.create("myProcessorState")
+ * .withKeys(...)
+ * .withValues(...)
+ * .persistent() // optional
+ * .build()
+ * .get();
+ *
+ * // register store
+ * builder.addStore(myStore);
+ *
+ * inputStream.process(new ProcessorSupplier() { ... }, "myProcessorState");
+ * }</pre>
+ * <p>
+ * Within the {@link org.apache.kafka.streams.processor.Processor Processor}, the state is obtained via the
+ * {@link org.apache.kafka.streams.processor.ProcessorContext ProcessorContext}.
+ * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Processor#punctuate(long) punctuate()},
+ * a schedule must be registered.
+ * <pre>{@code
+ * new ProcessorSupplier() {
+ * Processor get() {
+ * return new Processor() {
+ * private StateStore state;
+ *
+ * void init(ProcessorContext context) {
+ * this.state = context.getStateStore("myProcessorState");
+ * context.schedule(1000); // call #punctuate() each 1000ms
+ * }
+ *
+ * void transform(K key, V value) {
+ * // can access this.state
+ * }
+ *
+ * void punctuate(long timestamp) {
+ * // can access this.state
+ * }
+ *
+ * void close() {
+ * // can access this.state
+ * }
+ * }
+ * }
+ * }
+ * }</pre>
+ *
+ * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a
+ * {@link org.apache.kafka.streams.processor.Processor}
+ * @param stateStoreNames the names of the state store used by the processor
+ * @see #foreach(ForeachAction)
+ * @see #transform(TransformerSupplier, String...)
*/
- <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
+ void process(final ProcessorSupplier<K, V> processorSupplier,
+ final String... stateStoreNames);
/**
- * Process all elements in this stream, one element at a time, by applying a {@link org.apache.kafka.streams.processor.Processor}.
+ * Group the records of this {@link KStream} on a new key that is selected using the provided {@link KeyValueMapper}.
+ * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+ * (cf. {@link KGroupedStream}).
+ * The {@link KeyValueMapper} selects a new key (with potentially different type) while preserving the original values.
+ * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+ * <p>
+ * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
+ * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+ * {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internal
+ * generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * <p>
+ * All data of this stream will be redistributed through the repartitioning topic by writing all records to
+ * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
+ * <p>
+ * This is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey(Serde, Serde)}.
+ *
+ * @param selector a {@link KeyValueMapper} that computes a new key for grouping
+ * @param keySerde key serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param valSerde value serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <KR> the key type of the result {@link KGroupedStream}
+ * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
+ * @see #groupByKey()
+ */
+ <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<K, V, KR> selector,
+ final Serde<KR> keySerde,
+ final Serde<V> valSerde);
+
+ /**
+ * Group the records of this {@link KStream} on a new key that is selected using the provided {@link KeyValueMapper}
+ * and default serializers and deserializers.
+ * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+ * (cf. {@link KGroupedStream}).
+ * The {@link KeyValueMapper} selects a new key (with should be of the same type) while preserving the original values.
+ * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}
+ * <p>
+ * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
+ * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+ * {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internal
+ * generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * <p>
+ * All data of this stream will be redistributed through the repartitioning topic by writing all records to
+ * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
+ * <p>
+ * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
+ * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serde, Serde)} instead.
+ *
+ * @param selector a {@link KeyValueMapper} that computes a new key for grouping
+ * @param <KR> the key type of the result {@link KGroupedStream}
+ * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
+ */
+ <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<K, V, KR> selector);
+
+ /**
+ * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
+ * and default serializers and deserializers.
+ * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+ * (cf. {@link KGroupedStream}).
+ * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+ * <p>
+ * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
+ * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
+ * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
+ * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
+ * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+ * {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internal
+ * generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * <p>
+ * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
+ * records to and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
+ * correctly on its key.
+ * If the last key changing operator changed the key type, it is recommended to use
+ * {@link #groupByKey(Serde, Serde)} instead.
*
- * @param processorSupplier the supplier of {@link ProcessorSupplier} that generates {@link org.apache.kafka.streams.processor.Processor}
- * @param stateStoreNames the names of the state store used by the processor
+ * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
+ * @see #groupBy(KeyValueMapper)
*/
- void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
+ KGroupedStream<K, V> groupByKey();
/**
- * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
- * If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
- * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
- * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
- * {@link org.apache.kafka.streams.StreamsConfig}.
- *
- * @param otherStream the instance of {@link KStream} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param windows the specification of the {@link JoinWindows}
- * @param keySerde key serdes for materializing both streams,
- * if not specified the default serdes defined in the configs will be used
- * @param thisValueSerde value serdes for materializing this stream,
- * if not specified the default serdes defined in the configs will be used
- * @param otherValueSerde value serdes for materializing the other stream,
- * if not specified the default serdes defined in the configs will be used
- * @param <V1> the value type of the other stream
- * @param <R> the value type of the new stream
- *
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key and within the joining window intervals
- */
- <V1, R> KStream<K, R> join(
- KStream<K, V1> otherStream,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValueSerde,
- Serde<V1> otherValueSerde);
-
- /**
- * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
- * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
+ * Group the records by their current key into a {@link KGroupedStream} while preserving the original values.
+ * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+ * (cf. {@link KGroupedStream}).
+ * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+ * <p>
+ * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
+ * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
+ * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
+ * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
+ * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+ * {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internal
+ * generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * <p>
+ * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
+ * records to and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
+ * correctly on its key.
+ *
+ * @param keySerde key serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param valSerde value serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
+ */
+ KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
+ final Serde<V> valSerde);
+
+ /**
+ * Join records of this stream with another {@link KStream}'s records using windowed inner equi join with default
+ * serializers and deserializers.
+ * The join is a primary key join with join attribute {@code thisKStream.key == otherKStream.key}.
+ * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
+ * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
+ * <p>
+ * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute
+ * a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * If an input record key or value is {@code null} the record will not be included in the join operation and thus no
+ * output record will be added to the resulting {@link KStream}.
+ * <p>
+ * Example (assuming all input records belong to the correct windows):
+ * <table border='1'>
+ * <tr>
+ * <th>this</th>
+ * <th>other</th>
+ * <th>result</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td><K2:B></td>
+ * <td><K2:b></td>
+ * <td><K2:ValueJoiner(B,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K3:c></td>
+ * <td></td>
+ * </tr>
+ * </table>
+ * Keep in mind, that the <em>order</em> of the result depends on the non-deterministic processing order of the
+ * input streams.
+ * <p>
+ * Both input streams need to be co-partitioned on the join key.
+ * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
+ * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
+ * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internal
+ * generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * <p>
+ * Repartitioning can happen for one or both of the joining {@link KStream}s.
+ * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
+ * records to and rereading all records from it, such that the join input {@link KStream} is partitioned correctly
+ * on its key.
+ * <p>
* Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
- * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
- * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
- * {@link org.apache.kafka.streams.StreamsConfig}.
- *
- * @param otherStream the instance of {@link KStream} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param windows the specification of the {@link JoinWindows}
- * @param <V1> the value type of the other stream
- * @param <R> the value type of the new stream
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key and within the joining window intervals
- */
- <V1, R> KStream<K, R> join(
- KStream<K, V1> otherStream,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows);
-
- /**
- * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
- * If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with an auto-generated
- * store name.
- * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
- * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
- * {@link org.apache.kafka.streams.StreamsConfig}.
- *
- * @param otherStream the instance of {@link KStream} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param windows the specification of the {@link JoinWindows}
- * @param keySerde key serdes for materializing both streams,
- * if not specified the default serdes defined in the configs will be used
- * @param thisValueSerde value serdes for materializing this stream,
- * if not specified the default serdes defined in the configs will be used
- * @param otherValueSerde value serdes for materializing the other stream,
- * if not specified the default serdes defined in the configs will be used
- * @param <V1> the value type of the other stream
- * @param <R> the value type of the new stream
- *
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key and within the joining window intervals
- */
- <V1, R> KStream<K, R> outerJoin(
- KStream<K, V1> otherStream,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValueSerde,
- Serde<V1> otherValueSerde);
-
- /**
- * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
- * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
- * store names.
- * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
- * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
- * {@link org.apache.kafka.streams.StreamsConfig}.
- *
- * @param otherStream the instance of {@link KStream} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param windows the specification of the {@link JoinWindows}
- * @param <V1> the value type of the other stream
- * @param <R> the value type of the new stream
- *
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key and within the joining window intervals
- */
- <V1, R> KStream<K, R> outerJoin(
- KStream<K, V1> otherStream,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows);
-
- /**
- * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
- * If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
- * store names.
- * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
- * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
- * {@link org.apache.kafka.streams.StreamsConfig}.
- *
- * @param otherStream the instance of {@link KStream} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param windows the specification of the {@link JoinWindows}
- * @param keySerde key serdes for materializing the other stream,
- * if not specified the default serdes defined in the configs will be used
- * @param thisValSerde value serdes for materializing this stream,
- * if not specified the default serdes defined in the configs will be used
- * @param otherValueSerde value serdes for materializing the other stream,
- * if not specified the default serdes defined in the configs will be used
- * @param <V1> the value type of the other stream
- * @param <R> the value type of the new stream
- *
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key and within the joining window intervals
+ * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
+ * in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
+ * internal generated name, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param otherStream the {@link KStream} to be joined with this stream
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param windows the specification of the {@link JoinWindows}
+ * @param <VO> the value type of the other stream
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
+ * @see #leftJoin(KStream, ValueJoiner, JoinWindows)
+ * @see #outerJoin(KStream, ValueJoiner, JoinWindows)
*/
- <V1, R> KStream<K, R> leftJoin(
- KStream<K, V1> otherStream,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows,
- Serde<K> keySerde,
- Serde<V> thisValSerde,
- Serde<V1> otherValueSerde);
+ <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
+ final ValueJoiner<V, VO, VR> joiner,
+ final JoinWindows windows);
/**
- * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
- * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
- * store names.
- * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
- * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
- * {@link org.apache.kafka.streams.StreamsConfig}.
- *
- * @param otherStream the instance of {@link KStream} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param windows the specification of the {@link JoinWindows}
- * @param <V1> the value type of the other stream
- * @param <R> the value type of the new stream
- *
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key and within the joining window intervals
+ * Join records of this stream with another {@link KStream}'s records using windowed inner equi join.
+ * The join is a primary key join with join attribute {@code thisKStream.key == otherKStream.key}.
+ * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
+ * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
+ * <p>
+ * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute
+ * a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * If an input record key or value is {@code null} the record will not be included in the join operation and thus no
+ * output record will be added to the resulting {@link KStream}.
+ * <p>
+ * Example (assuming all input records belong to the correct windows):
+ * <table border='1'>
+ * <tr>
+ * <th>this</th>
+ * <th>other</th>
+ * <th>result</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td><K2:B></td>
+ * <td><K2:b></td>
+ * <td><K2:ValueJoiner(B,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K3:c></td>
+ * <td></td>
+ * </tr>
+ * </table>
+ * Keep in mind, that the <em>order</em> of the result depends on the non-deterministic processing order of the
+ * input streams.
+ * <p>
+ * Both input streams need to be co-partitioned on the join key.
+ * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
+ * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
+ * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internal
+ * generated name, and "-repartition" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ * <p>
+ * Repartitioning can happen for one or both of the joining {@link KStream}s.
+ * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
+ * records to and rereading all records from it, such that the join input {@link KStream} is partitioned correctly
+ * on its key.
+ * <p>
+ * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
+ * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified
+ * in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
+ * internal generated name, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param otherStream the {@link KStream} to be joined with this stream
+ * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+ * @param windows the specification of the {@link JoinWindows}
+ * @param keySerde key serdes for materializing both streams,
+ * if not specified the default serdes defined in the configs will be used
+ * @param thisValueSerde value serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param otherValueSerde value serdes for materializing the other stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <VO> the value type of the other stream
+ * @param <VR> the value type of the result stream
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given
+ * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
+ * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
+ * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
*/
- <V1, R> KStream<K, R> leftJoin(
- KStream<K, V1> otherStream,
- ValueJoiner<V, V1, R> joiner,
- JoinWindows windows);
+ <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
+ final ValueJoiner<V, VO, VR> joiner,
+ final JoinWindows windows,
+ final Serde<K> keySerde,
+ final Serde<V> thisValueSerde,
+ final Serde<VO> otherValueSerde);
/**
- * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
- * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
- *
- * @param table the instance of {@link KTable} joined with this stream
- * @param joiner the instance of {@link ValueJoiner}
- * @param <V1> the value type of the table
- * @param <V2> the value type of the new stream
- * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
- * one for each matched record-pair with the same key
+ * Join records of this stream with another {@link KStream}'s records using windowed left equi join with default
+ * serializers and deserializers.
+ * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will
+ * produce at least one output record (cf. below).
+ * The join is a primary key join with join attribute {@code thisKStream.key == otherKStream.key}.
+ * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
+ * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
+ * <p>
+ * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute
+ * a value (with arbitrary type) for the result record.
+ * The key of the result record is the same as for both joining input records.
+ * Furthermore, for each input record of this {@link KStream} that does not satisfy the join predicate the provided
+ * {@link ValueJoiner} will be called with a {@code null} value for the other stream.
+ * If an input record key or value is {@code null} the record will not be included in the join operation and thus no
+ * output record will be added to the resulting {@link KStream}.
+ * <p>
+ * Example (assuming all input records belong to the correct windows):
+ * <table border='1'>
+ * <tr>
+ * <th>this</th>
+ * <th>other</th>
+ * <th>result</th>
+ * </tr>
+ * <tr>
+ * <td><K1:A></td>
+ * <td></td>
+ * <td><K1:ValueJoiner(A,null)></td>
+ * </tr>
+ * <tr>
+ * <td><K2:B></td>
+ * <td><K2:b></td>
+ * <td><K2:ValueJoiner(B,b)></td>
+ * </tr>
+ * <tr>
+ * <td></td>
+ * <td><K3:c></td>
+ * <td></td>
+ * </tr>
+ * </table>
+ * Keep in mind, that the <em>order</em> of the result depends on the non-deterministic processing order of the
+ * input streams.
+ * The non-deterministic processing order might also lead to unexpected (but correct)
<TRUNCATED>
[2/2] kafka git commit: MINOR: Update JavaDoc of KStream interface
Posted by gu...@apache.org.
MINOR: Update JavaDoc of KStream interface
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Eno Thereska, Guozhang Wang
Closes #2153 from mjsax/javaDocKStreams
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1949a76b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1949a76b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1949a76b
Branch: refs/heads/trunk
Commit: 1949a76bc4189534b853e21c476bb11172fa3fc9
Parents: 600859e
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Dec 8 11:07:59 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 8 11:07:59 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedStream.java | 634 ++++--
.../apache/kafka/streams/kstream/KStream.java | 2020 +++++++++++++-----
2 files changed, 2028 insertions(+), 626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1949a76b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index f47c904..33a2791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -4,231 +4,569 @@
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.WindowStore;
/**
- * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
- * usually grouped on a different key than the original stream key
- *
+ * {@link KGroupedStream} is an abstraction of a <i>grouped</i> record stream of key-value pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply an aggregation operation on the original
+ * {@link KStream} records.
* <p>
- * It is an intermediate representation of a {@link KStream} before an
- * aggregation is applied to the new partitions resulting in a new {@link KTable}.
+ * A {@link KGroupedStream} must be obtained from a {@link KStream} via {@link KStream#groupByKey() #groupByKey()} or
+ * {@link KStream#groupBy(KeyValueMapper) #groupBy(...)}.
+ *
* @param <K> Type of keys
* @param <V> Type of values
- *
* @see KStream
*/
@InterfaceStability.Unstable
public interface KGroupedStream<K, V> {
-
/**
- * Combine values of this stream by the grouped key into a new instance of ever-updating
- * {@link KTable}. The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code storeName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param reducer the instance of {@link Reducer}
- * @param storeName the name of the underlying {@link KTable} state store
- *
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+ * @param storeName the name of the underlying {@link KTable} state store
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) count (i.e., number of records) for each key
*/
- KTable<K, V> reduce(Reducer<V> reducer,
- final String storeName);
+ KTable<K, Long> count(final String storeName);
/**
- * Combine values of this stream by the grouped key into a new instance of ever-updating
- * {@link KTable}. The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * String storeName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param reducer the instance of {@link Reducer}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) count (i.e., number of records) for each key
*/
- KTable<K, V> reduce(final Reducer<V> reducer,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code storeName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param reducer the instance of {@link Reducer}
- * @param windows the specification of the aggregation {@link Windows}
- * @param storeName the name of the state store created from this operation
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) aggregate for each key within that window
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param storeName the name of the underlying {@link KTable} state store
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) count (i.e., number of records) for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
- Windows<W> windows,
- final String storeName);
+ <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+ final String storeName);
/**
- * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * String storeName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param reducer the instance of {@link Reducer}
* @param windows the specification of the aggregation {@link Windows}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) aggregate for each key within that window
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) count (i.e., number of records) for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
- Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
-
+ <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
/**
- * Aggregate values of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Combine the values of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code storeName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
- * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param storeName the name of the state store created from this operation
- * @param <T> the value type of the resulting {@link KTable}
- *
- * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param storeName the name of the underlying {@link KTable} state store
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- Serde<T> aggValueSerde,
- final String storeName);
+ KTable<K, V> reduce(final Reducer<V> reducer,
+ final String storeName);
/**
- * Aggregate values of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Combine the value of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+ * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
+ * @param reducer a {@link Reducer} that computes a new aggregate result
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @param <T> the value type of the resulting {@link KTable}
- * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, V> reduce(final Reducer<V> reducer,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Combine the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code storeName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> sumForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
- * @param windows the specification of the aggregation {@link Windows}
- * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param <T> the value type of the resulting {@link KTable}
- * @param storeName the name of the state store created from this operation
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values with type {@code T}
- * that represent the latest (rolling) aggregate for each key within that window
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param storeName the name of the state store created from this operation
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- Windows<W> windows,
- Serde<T> aggValueSerde,
- final String storeName);
+ <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows,
+ final String storeName);
/**
- * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Combine the values of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * Sting storeName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> sumForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
+ * @param reducer a {@link Reducer} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
- * @param <T> the value type of the resulting {@link KTable}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values with type {@code T}
- * that represent the latest (rolling) aggregate for each key within that window
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
+
/**
- * Count number of records of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it allows the
+ * result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code storeName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param storeName the name of the underlying {@link KTable} state store
- *
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param storeName the name of the state store created from this operation
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- KTable<K, Long> count(final String storeName);
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final Serde<VR> aggValueSerde,
+ final String storeName);
/**
- * Count number of records of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier)} combining via reduce(...)} as it
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
+ * like count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- *
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Aggregate the values of records in this stream by the grouped key and defined windows.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Windows, String)} combining via reduce(...)} as it
+ * allows the result to have a different type than the input values.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code storeName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Initializer} is applied once per window directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+ * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> aggForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param windows the specification of the aggregation {@link Windows}
- * @param storeName the name of the state store created from this operation
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) count (i.e., number of records) for each key within that window
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @param storeName the name of the state store created from this operation
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, final String storeName);
+ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final Windows<W> windows,
+ final Serde<VR> aggValueSerde,
+ final String storeName);
/**
- * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Aggregate the values of records in this stream by the grouped key and defined windows.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier)} combining via
+ * reduce(...)} as it allows the result to have a different type than the input values.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Initializer} is applied once per window directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+ * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+ * Sting storeName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> aggForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
+ * @param <VR> the value type of the resulting {@link KTable}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) count (i.e., number of records) for each key within that window
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
}