You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 23:16:02 UTC
[5/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for
queryable stores
KAFKA-5045: Clarify on KTable APIs for queryable stores
This is the implementation of KIP-114: KTable state stores and improved semantics:
- Allow for decoupling between querying and materialisation
- consistent APIs, overloads with queryableName and without
- depreciated several KTable calls
- new unit and integration tests
In this implementation, state stores are materialized if the user desires them to be queryable. In subsequent versions we can offer a second option, to have a view-like state store. The tradeoff then would be between storage space (materialize) and re-computation (view). That tradeoff can be exploited by later query optimizers.
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes #2832 from enothereska/KAFKA-5045-ktable
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec9e4eaf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec9e4eaf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec9e4eaf
Branch: refs/heads/trunk
Commit: ec9e4eafa406fec897713310bafdedf6bbb3c0c5
Parents: a3952ae
Author: Eno Thereska <en...@gmail.com>
Authored: Wed May 3 16:15:54 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 3 16:15:54 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedStream.java | 552 +++++++--
.../kafka/streams/kstream/KGroupedTable.java | 323 +++++-
.../kafka/streams/kstream/KStreamBuilder.java | 479 +++++++-
.../apache/kafka/streams/kstream/KTable.java | 1094 +++++++++++++++++-
.../kstream/internals/AbstractStream.java | 7 +
.../kstream/internals/KGroupedStreamImpl.java | 128 +-
.../kstream/internals/KGroupedTableImpl.java | 113 +-
.../streams/kstream/internals/KStreamImpl.java | 2 +-
.../streams/kstream/internals/KTableFilter.java | 26 +-
.../streams/kstream/internals/KTableImpl.java | 326 +++++-
.../internals/KTableKTableJoinMerger.java | 35 +-
.../kstream/internals/KTableMapValues.java | 27 +-
.../streams/processor/TopologyBuilder.java | 20 +-
.../KStreamAggregationIntegrationTest.java | 28 +-
.../KTableKTableJoinIntegrationTest.java | 155 ++-
.../QueryableStateIntegrationTest.java | 178 +++
.../streams/kstream/KStreamBuilderTest.java | 55 +-
.../internals/KGroupedStreamImplTest.java | 279 +++--
.../internals/KGroupedTableImplTest.java | 72 +-
.../kstream/internals/KTableAggregateTest.java | 55 +-
.../kstream/internals/KTableFilterTest.java | 245 +++-
.../kstream/internals/KTableImplTest.java | 32 +-
.../kstream/internals/KTableKTableJoinTest.java | 162 +--
.../kstream/internals/KTableMapValuesTest.java | 128 +-
.../streams/processor/TopologyBuilderTest.java | 2 +-
.../internals/ProcessorTopologyTest.java | 4 +-
26 files changed, 3793 insertions(+), 734 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index c961c7e..2cdf047 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -49,7 +49,7 @@ public interface KGroupedStream<K, V> {
* Count the number of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code storeName}.
+ * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -63,7 +63,7 @@ public interface KGroupedStream<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -73,18 +73,44 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
- KTable<K, Long> count(final String storeName);
+ KTable<K, Long> count(final String queryableStoreName);
+
+ /**
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view).
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+ * represent the latest (rolling) count (i.e., number of records) for each key
+ */
+ KTable<K, Long> count();
/**
* Count the number of records in this stream by the grouped key.
@@ -105,15 +131,15 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String queryableStoreName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
@@ -125,7 +151,7 @@ public interface KGroupedStream<K, V> {
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
- * materialized view) that can be queried using the provided {@code storeName}.
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
* Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -141,7 +167,7 @@ public interface KGroupedStream<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
* String key = "some-word";
* long fromTime = ...;
* long toTime = ...;
@@ -153,20 +179,52 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param windows the specification of the aggregation {@link Windows}
- * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
*/
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Count the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} key or value are ignored.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code queryableName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param windows the specification of the aggregation {@link Windows}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ */
+ <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows);
/**
* Count the number of records in this stream by the grouped key and the defined windows.
@@ -191,8 +249,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * String queryableStoreName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableName, QueryableStoreTypes.<String, Long>windowStore());
* String key = "some-word";
* long fromTime = ...;
* long toTime = ...;
@@ -202,7 +260,7 @@ public interface KGroupedStream<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param windows the specification of the aggregation {@link Windows}
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
*/
@@ -214,7 +272,7 @@ public interface KGroupedStream<K, V> {
* Count the number of records in this stream by the grouped key into {@link SessionWindows}.
* Records with {@code null} key or value are ignored.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating
- * materialized view) that can be queried using the provided {@code storeName}.
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
* SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -226,27 +284,48 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link SessionStore} it must be obtained via
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
- * KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
- * String key = "some-word";
- * KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * KafkaStreams streams = ... // compute sum
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+ * String key = "some-key";
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
+ * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+ * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+ * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+ */
+ KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
+
+ /**
+ * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
+ * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
*
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
- * @param storeName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
*/
- KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);
+ KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
/**
* Count the number of records in this stream by the grouped key into {@link SessionWindows}.
@@ -264,22 +343,21 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link SessionStore} it must be obtained via
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
- * KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
- * String key = "some-word";
- * KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * KafkaStreams streams = ... // compute sum
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+ * String key = "some-key";
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- *
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
*/
@@ -292,7 +370,42 @@ public interface KGroupedStream<K, V> {
* Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code storeName}.
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ KTable<K, V> reduce(final Reducer<V> reducer);
+
+ /**
+ * Combine the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
@@ -312,7 +425,7 @@ public interface KGroupedStream<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -322,20 +435,21 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
- * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
KTable<K, V> reduce(final Reducer<V> reducer,
- final String storeName);
+ final String queryableStoreName);
+
/**
* Combine the value of records in this stream by the grouped key.
@@ -365,8 +479,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * String storeName = storeSupplier.name();
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String queryableStoreName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -374,7 +488,7 @@ public interface KGroupedStream<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
@@ -389,7 +503,7 @@ public interface KGroupedStream<K, V> {
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
- * materialized view) that can be queried using the provided {@code storeName}.
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
* Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -411,7 +525,7 @@ public interface KGroupedStream<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
* String key = "some-key";
* long fromTime = ...;
* long toTime = ...;
@@ -423,22 +537,64 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
- * @param storeName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Combine the number of records in this stream by the grouped key and the defined windows.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param windows the specification of the aggregation {@link Windows}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final Windows<W> windows);
/**
* Combine the values of records in this stream by the grouped key and the defined windows.
@@ -472,8 +628,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * Sting storeName = storeSupplier.name();
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
* String key = "some-key";
* long fromTime = ...;
* long toTime = ...;
@@ -484,7 +640,7 @@ public interface KGroupedStream<K, V> {
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
@@ -498,7 +654,7 @@ public interface KGroupedStream<K, V> {
* Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}).
* The result is written into a local {@link SessionStore} (which is basically an ever-updating
- * materialized view) that can be queried using the provided {@code storeName}.
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
* SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -517,13 +673,15 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link SessionStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -531,21 +689,53 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer}
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
- * @param storeName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Combine values of this stream by the grouped key into {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}).
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
+ * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate and the record's value.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to compute aggregate functions like sum, min,
+ * or max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * @param reducer the instance of {@link Reducer}
+ * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+ final SessionWindows sessionWindows);
/**
* Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -572,15 +762,15 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
- * To query the local {@link SessionStore} it must be obtained via
+ * To query the local windowed {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // compute sum
- * Sting storeName = storeSupplier.name();
- * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
* String key = "some-key";
- * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -588,14 +778,14 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer}
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
@@ -610,7 +800,7 @@ public interface KGroupedStream<K, V> {
* Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
* allows the result to have a different type than the input values.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code storeName}.
+ * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
@@ -632,7 +822,7 @@ public interface KGroupedStream<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -642,18 +832,18 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
- * @param storeName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)} ()} ()}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
@@ -661,7 +851,52 @@ public interface KGroupedStream<K, V> {
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count(String)}).
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Serde<VR> aggValueSerde);
+
/**
* Aggregate the values of records in this stream by the grouped key.
@@ -692,8 +927,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some aggregation on value type double
- * Sting storeName = storeSupplier.name();
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -702,7 +937,7 @@ public interface KGroupedStream<K, V> {
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
* @param aggregator an {@link Aggregator} that computes a new aggregate result
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
@@ -719,7 +954,7 @@ public interface KGroupedStream<K, V> {
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
* {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
* The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
- * materialized view) that can be queried using the provided {@code storeName}.
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
* Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -743,7 +978,7 @@ public interface KGroupedStream<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
* String key = "some-key";
* long fromTime = ...;
* long toTime = ...;
@@ -755,10 +990,10 @@ public interface KGroupedStream<K, V> {
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
* alphanumerics, '.', '_' and '-'.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
*
@@ -768,8 +1003,8 @@ public interface KGroupedStream<K, V> {
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param <VR> the value type of the resulting {@link KTable}
- * @param storeName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
@@ -777,7 +1012,58 @@ public interface KGroupedStream<K, V> {
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
final Serde<VR> aggValueSerde,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key and defined windows.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Windows, String) combining via reduce(...)} as it,
+ * for example, allows the result to have a different type than the input values.
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Initializer} is applied once per window directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+ * functions like count (c.f. {@link #count(String)}).
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param windows the specification of the aggregation {@link Windows}
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Windows<W> windows,
+ final Serde<VR> aggValueSerde);
+
/**
* Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -812,8 +1098,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type Long
- * Sting storeName = storeSupplier.name();
- * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
* String key = "some-key";
* long fromTime = ...;
* long toTime = ...;
@@ -826,7 +1112,7 @@ public interface KGroupedStream<K, V> {
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
* @param <VR> the value type of the resulting {@link KTable}
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
@@ -841,7 +1127,7 @@ public interface KGroupedStream<K, V> {
* Aggregating is a generalization of {@link #reduce(Reducer, SessionWindows, String) combining via
* reduce(...)} as it, for example, allows the result to have a different type than the input values.
* The result is written into a local {@link SessionStore} (which is basically an ever-updating
- * materialized view) that can be queried using the provided {@code storeName}.
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
* SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
* Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -866,8 +1152,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
- * Sting storeName = storeSupplier.name();
- * ReadOnlySessionStore<String, Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -881,8 +1167,8 @@ public interface KGroupedStream<K, V> {
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param <T> the value type of the resulting {@link KTable}
- * @param storeName the name of the state store created from this operation; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
*/
@@ -891,7 +1177,49 @@ public interface KGroupedStream<K, V> {
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, SessionWindows, String) combining via
+ * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+ * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the provided {@code queryableStoreName}.
+ * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * The specified {@link Initializer} is applied once per session directly before the first input record is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
+ * aggregate functions like count (c.f. {@link #count(String)})
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same window and key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * @param initializer the instance of {@link Initializer}
+ * @param aggregator the instance of {@link Aggregator}
+ * @param sessionMerger the instance of {@link Merger}
+ * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <T> the value type of the resulting {@link KTable}
+ * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+ * the latest (rolling) aggregate for each key within a window
+ */
+ <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+ final Aggregator<? super K, ? super V, T> aggregator,
+ final Merger<? super K, T> sessionMerger,
+ final SessionWindows sessionWindows,
+ final Serde<T> aggValueSerde);
/**
* Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -924,8 +1252,8 @@ public interface KGroupedStream<K, V> {
* Use {@link StateStoreSupplier#name()} to get the store name:
* <pre>{@code
* KafkaStreams streams = ... // some windowed aggregation on value type double
- * Sting storeName = storeSupplier.name();
- * ReadOnlySessionStore<String, Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+ * Sting queryableStoreName = storeSupplier.name();
+ * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
* String key = "some-key";
* KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -939,7 +1267,7 @@ public interface KGroupedStream<K, V> {
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param <T> the value type of the resulting {@link KTable}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window