You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/08 19:08:03 UTC
[2/2] kafka git commit: MINOR: Update JavaDoc of KStream interface
MINOR: Update JavaDoc of KStream interface
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Eno Thereska, Guozhang Wang
Closes #2153 from mjsax/javaDocKStreams
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1949a76b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1949a76b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1949a76b
Branch: refs/heads/trunk
Commit: 1949a76bc4189534b853e21c476bb11172fa3fc9
Parents: 600859e
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Dec 8 11:07:59 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 8 11:07:59 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedStream.java | 634 ++++--
.../apache/kafka/streams/kstream/KStream.java | 2020 +++++++++++++-----
2 files changed, 2028 insertions(+), 626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1949a76b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index f47c904..33a2791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -4,231 +4,569 @@
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.WindowStore;
/**
- * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
- * usually grouped on a different key than the original stream key
- *
+ * {@link KGroupedStream} is an abstraction of a <i>grouped</i> record stream of key-value pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply an aggregation operation on the original
+ * {@link KStream} records.
* <p>
- * It is an intermediate representation of a {@link KStream} before an
- * aggregation is applied to the new partitions resulting in a new {@link KTable}.
+ * A {@link KGroupedStream} must be obtained from a {@link KStream} via {@link KStream#groupByKey() #groupByKey()} or
+ * {@link KStream#groupBy(KeyValueMapper) #groupBy(...)}.
+ *
* @param <K> Type of keys
* @param <V> Type of values
- *
* @see KStream
*/
@InterfaceStability.Unstable
public interface KGroupedStream<K, V> {
-
/**
- * Combine values of this stream by the grouped key into a new instance of ever-updating
- * {@link KTable}. The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code storeName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param reducer the instance of {@link Reducer}
- * @param storeName the name of the underlying {@link KTable} state store
- *
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+ * @param storeName the name of the underlying {@link KTable} state store
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) count (i.e., number of records) for each key
*/
- KTable<K, V> reduce(Reducer<V> reducer,
- final String storeName);
+ KTable<K, Long> count(final String storeName);
/**
- * Combine values of this stream by the grouped key into a new instance of ever-updating
- * {@link KTable}. The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * String storeName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param reducer the instance of {@link Reducer}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) count (i.e., number of records) for each key
*/
- KTable<K, V> reduce(final Reducer<V> reducer,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code storeName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param reducer the instance of {@link Reducer}
- * @param windows the specification of the aggregation {@link Windows}
- * @param storeName the name of the state store created from this operation
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) aggregate for each key within that window
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param storeName the name of the underlying {@link KTable} state store
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) count (i.e., number of records) for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
- Windows<W> windows,
- final String storeName);
+ <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+ final String storeName);
/**
- * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * String storeName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-word";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param reducer the instance of {@link Reducer}
* @param windows the specification of the aggregation {@link Windows}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) aggregate for each key within that window
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) count (i.e., number of records) for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
- Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
-
+ <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
/**
- * Aggregate values of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Combine the values of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code storeName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
- * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param storeName the name of the state store created from this operation
- * @param <T> the value type of the resulting {@link KTable}
- *
- * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param storeName the name of the underlying {@link KTable} state store
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- Serde<T> aggValueSerde,
- final String storeName);
+ KTable<K, V> reduce(final Reducer<V> reducer,
+ final String storeName);
/**
- * Aggregate values of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Combine the value of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+ * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
+ * @param reducer a {@link Reducer} that computes a new aggregate result
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @param <T> the value type of the resulting {@link KTable}
- * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- <T> KTable<K, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- final StateStoreSupplier<KeyValueStore> storeSupplier);
+ KTable<K, V> reduce(final Reducer<V> reducer,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Combine the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code storeName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> sumForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
- * @param windows the specification of the aggregation {@link Windows}
- * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
- * if not specified the default serdes defined in the configs will be used
- * @param <T> the value type of the resulting {@link KTable}
- * @param storeName the name of the state store created from this operation
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values with type {@code T}
- * that represent the latest (rolling) aggregate for each key within that window
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param storeName the name of the state store created from this operation
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- Windows<W> windows,
- Serde<T> aggValueSerde,
- final String storeName);
+ <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows,
+ final String storeName);
/**
- * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Combine the values of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * Sting storeName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> sumForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param initializer the instance of {@link Initializer}
- * @param aggregator the instance of {@link Aggregator}
+ * @param reducer a {@link Reducer} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
- * @param <T> the value type of the resulting {@link KTable}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values with type {@code T}
- * that represent the latest (rolling) aggregate for each key within that window
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
- Aggregator<K, V, T> aggregator,
- Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
+
/**
- * Count number of records of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it allows the
+ * result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code storeName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param storeName the name of the underlying {@link KTable} state store
- *
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param storeName the name of the state store created from this operation
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- KTable<K, Long> count(final String storeName);
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final Serde<VR> aggValueSerde,
+ final String storeName);
/**
- * Count number of records of this stream by key into a new instance of a {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier)} combining via reduce(...)} as it
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
+ * like count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- *
- * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+ * (rolling) aggregate for each key
*/
- KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
- * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a local state
- * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
- * will be automatically created in Kafka for failure recovery, where "applicationID"
- * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+ * Aggregate the values of records in this stream by the grouped key and defined windows.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Windows, String)} combining via reduce(...)} as it
+ * allows the result to have a different type than the input values.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code storeName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the cache size.
+ * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * The specified {@link Initializer} is applied once per window directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+ * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> aggForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+ * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param windows the specification of the aggregation {@link Windows}
- * @param storeName the name of the state store created from this operation
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) count (i.e., number of records) for each key within that window
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @param storeName the name of the state store created from this operation
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, final String storeName);
+ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final Windows<W> windows,
+ final Serde<VR> aggValueSerde,
+ final String storeName);
/**
- * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
- * The resulting {@link KTable} will be materialized in a state
- * store provided by the {@link StateStoreSupplier}.
+ * Aggregate the values of records in this stream by the grouped key and defined windows.
+ * Records with {@code null} value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier)} combining via
+ * reduce(...)} as it allows the result to have a different type than the input values.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Initializer} is applied once per window directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+ * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+ * <p>
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+ * Sting storeName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String key = "some-key";
+ * long fromTime = ...;
+ * long toTime = ...;
+ * WindowStoreIterator<Long> aggForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
*
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
+ * @param <VR> the value type of the resulting {@link KTable}
* @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
- * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
- * where each table contains records with unmodified keys and values
- * that represent the latest (rolling) count (i.e., number of records) for each key within that window
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+ * the latest (rolling) aggregate for each key within a window
*/
- <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows,
- final StateStoreSupplier<WindowStore> storeSupplier);
+ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<K, V, VR> aggregator,
+ final Windows<W> windows,
+ final StateStoreSupplier<WindowStore> storeSupplier);
}