You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 23:16:01 UTC
[4/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for
queryable stores
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 8685e8b..d14e600 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -46,7 +46,7 @@ public interface KGroupedTable<K, V> {
* the same key into a new instance of {@link KTable}.
* Records with {@code null} key are ignored.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code storeName}.
+ * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -60,7 +60,7 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -68,20 +68,48 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
* '.', '_' and '-'.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
- KTable<K, Long> count(final String storeName);
+ KTable<K, Long> count(final String queryableStoreName);
+
+ /**
+ * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
+ * the same key into a new instance of {@link KTable}.
+ * Records with {@code null} key are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+ * represent the latest (rolling) count (i.e., number of records) for each key
+ */
+ KTable<K, Long> count();
/**
* Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
@@ -102,8 +130,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String queryableStoreName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -111,13 +139,13 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
* represent the latest (rolling) count (i.e., number of records) for each key
*/
@@ -130,7 +158,7 @@ public interface KGroupedTable<K, V> {
* Combining implies that the type of the aggregate result is the same as the type of the input value
* (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code storeName}.
+ * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
@@ -167,7 +195,7 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -175,24 +203,80 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
* '.', '_' and '-'.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param adder a {@link Reducer} that adds a new value to the aggregate result
* @param subtractor a {@link Reducer} that removed an old value from the aggregate result
- * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics,
- * '.', '_' and '-'
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics,
+ * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+ * mapped} to the same key into a new instance of {@link KTable}.
+ * Records with {@code null} key are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+ * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the
+ * current aggregate and the record's value by adding the new record to the aggregate.
+ * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable}
+ * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+ * record from the aggregate.
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum.
+ * For sum, the adder and substractor would work as follows:
+ * <pre>{@code
+ * public class SumAdder implements Reducer<Integer> {
+ * public Integer apply(Integer currentAgg, Integer newValue) {
+ * return currentAgg + newValue;
+ * }
+ * }
+ *
+ * public class SumSubtractor implements Reducer<Integer> {
+ * public Integer apply(Integer currentAgg, Integer oldValue) {
+ * return currentAgg - oldValue;
+ * }
+ * }
+ * }</pre>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param adder a {@link Reducer} that adds a new value to the aggregate result
+ * @param subtractor a {@link Reducer} that removed an old value from the aggregate result
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ KTable<K, V> reduce(final Reducer<V> adder,
+ final Reducer<V> subtractor);
/**
* Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -238,8 +322,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String queryableStoreName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -247,15 +331,15 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param adder a {@link Reducer} that adds a new value to the aggregate result
* @param subtractor a {@link Reducer} that removed an old value from the aggregate result
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
@@ -319,7 +403,7 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -327,16 +411,17 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer a {@link Initializer} that provides an initial aggregate result value
* @param adder a {@link Aggregator} that adds a new record to the aggregate result
* @param subtractor a {@link Aggregator} that removed an old record from the aggregate result
- * @param storeName the name of the underlying {@link KTable} state store
+ * @param queryableStoreName the name of the underlying {@link KTable} state store.
+ * If {@code null} this is the equivalent of {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator)} ()}.
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
@@ -344,7 +429,7 @@ public interface KGroupedTable<K, V> {
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
- final String storeName);
+ final String queryableStoreName);
/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -352,8 +437,79 @@ public interface KGroupedTable<K, V> {
* Records with {@code null} key are ignored.
* Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
* for example, allows the result to have a different type than the input values.
+ * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
+ * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
+ * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
* The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
- * that can be queried using the provided {@code storeName}.
+ * provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+ * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the
+ * current aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+ * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable}
+ * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+ * record from the aggregate.
+ * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions
+ * like sum.
+ * For sum, the initializer, adder, and substractor would work as follows:
+ * <pre>{@code
+ * // in this example, LongSerde.class must be set as default value serde in StreamsConfig
+ * public class SumInitializer implements Initializer<Long> {
+ * public Long apply() {
+ * return 0L;
+ * }
+ * }
+ *
+ * public class SumAdder implements Aggregator<String, Integer, Long> {
+ * public Long apply(String key, Integer newValue, Long aggregate) {
+ * return aggregate + newValue;
+ * }
+ * }
+ *
+ * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+ * public Long apply(String key, Integer oldValue, Long aggregate) {
+ * return aggregate - oldValue;
+ * }
+ * }
+ * }</pre>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param initializer a {@link Initializer} that provides an initial aggregate result value
+ * @param adder a {@link Aggregator} that adds a new record to the aggregate result
+ * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result
+ * @param <VR> the value type of the aggregated {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> adder,
+ final Aggregator<? super K, ? super V, VR> subtractor);
+
+
+ /**
+ * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+ * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
+ * Records with {@code null} key are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+ * for example, allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
* Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
* <p>
* The specified {@link Initializer} is applied once directly before the first input record is processed to
@@ -398,7 +554,7 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -406,10 +562,10 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
* '.', '_' and '-'.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
@@ -419,8 +575,8 @@ public interface KGroupedTable<K, V> {
* @param subtractor a {@link Aggregator} that removed an old record from the aggregate result
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
- * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
- * alphanumerics, '.', '_' and '-'
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+ * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde)} ()}.
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
@@ -429,7 +585,78 @@ public interface KGroupedTable<K, V> {
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Serde<VR> aggValueSerde,
- final String storeName);
+ final String queryableStoreName);
+
+ /**
+ * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+ * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
+ * Records with {@code null} key are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+ * for example, allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+ * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the
+ * current aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+ * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable}
+ * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+ * record from the aggregate.
+ * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions
+ * like sum.
+ * For sum, the initializer, adder, and substractor would work as follows:
+ * <pre>{@code
+ * public class SumInitializer implements Initializer<Long> {
+ * public Long apply() {
+ * return 0L;
+ * }
+ * }
+ *
+ * public class SumAdder implements Aggregator<String, Integer, Long> {
+ * public Long apply(String key, Integer newValue, Long aggregate) {
+ * return aggregate + newValue;
+ * }
+ * }
+ *
+ * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+ * public Long apply(String key, Integer oldValue, Long aggregate) {
+ * return aggregate - oldValue;
+ * }
+ * }
+ * }</pre>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param initializer a {@link Initializer} that provides an initial aggregate result value
+ * @param adder a {@link Aggregator} that adds a new record to the aggregate result
+ * @param subtractor a {@link Aggregator} that removed an old record from the aggregate result
+ * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+ * if not specified the default serdes defined in the configs will be used
+ * @param <VR> the value type of the aggregated {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> adder,
+ final Aggregator<? super K, ? super V, VR> subtractor,
+ final Serde<VR> aggValueSerde);
+
/**
* Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -483,8 +710,8 @@ public interface KGroupedTable<K, V> {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // counting words
- * String storeName = storeSupplier.name();
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String queryableStoreName = storeSupplier.name();
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-word";
* Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -492,16 +719,16 @@ public interface KGroupedTable<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
- * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
- * provide {@code storeName}, and "-changelog" is a fixed suffix.
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param initializer a {@link Initializer} that provides an initial aggregate result value
* @param adder a {@link Aggregator} that adds a new record to the aggregate result
* @param subtractor a {@link Aggregator} that removed an old record from the aggregate result
- * @param storeSupplier user defined state store supplier
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index c361cad..0e02c8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,6 +34,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.util.Collections;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -243,7 +243,7 @@ public class KStreamBuilder extends TopologyBuilder {
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
+ * {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -251,7 +251,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -259,12 +259,103 @@ public class KStreamBuilder extends TopologyBuilder {
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final String topic,
- final String storeName) {
- return table(null, null, null, topic, storeName);
+ final String queryableStoreName) {
+ return table(null, null, null, topic, queryableStoreName);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return table(null, null, null, topic, storeSupplier);
+ }
+
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final String topic) {
+ return table(null, null, null, topic, (String) null);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+ final String topic,
+ final String queryableStoreName) {
+ return table(offsetReset, null, null, topic, queryableStoreName);
}
/**
@@ -276,7 +367,7 @@ public class KStreamBuilder extends TopologyBuilder {
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
+ * {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -284,7 +375,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -294,13 +385,36 @@ public class KStreamBuilder extends TopologyBuilder {
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
* @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final String topic,
- final String storeName) {
- return table(offsetReset, null, null, topic, storeName);
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return table(offsetReset, null, null, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; if {@code null} an internal store name will be automatically given.
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+ final String topic) {
+ return table(offsetReset, null, null, topic, (String) null);
}
/**
@@ -312,7 +426,7 @@ public class KStreamBuilder extends TopologyBuilder {
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
+ * {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -320,7 +434,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -332,25 +446,26 @@ public class KStreamBuilder extends TopologyBuilder {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final String storeName) {
- return table(null, keySerde, valSerde, topic, storeName);
+ final String queryableStoreName) {
+ return table(null, keySerde, valSerde, topic, queryableStoreName);
}
/**
* Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
+ * {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -358,7 +473,94 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return table(null, keySerde, valSerde, topic, storeSupplier);
+ }
+
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic) {
+ return table(null, keySerde, valSerde, topic, (String) null);
+ }
+
+ private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final boolean isQueryable) {
+ final String source = newName(KStreamImpl.SOURCE_NAME);
+ final String name = newName(KTableImpl.SOURCE_NAME);
+ final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+
+ addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(),
+ valSerde == null ? null : valSerde.deserializer(),
+ topic);
+ addProcessor(name, processorSupplier, source);
+
+ final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
+ keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+
+ addStateStore(storeSupplier, name);
+ connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+ return kTable;
+ }
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -372,37 +574,142 @@ public class KStreamBuilder extends TopologyBuilder {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final String storeName) {
- final String source = newName(KStreamImpl.SOURCE_NAME);
- final String name = newName(KTableImpl.SOURCE_NAME);
- final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
-
- addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic);
- addProcessor(name, processorSupplier, source);
-
- final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);
+ final String queryableStoreName) {
+ final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+ final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true);
+ return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, queryableStoreName != null);
+ }
- // only materialize the KTable into a state store if the storeName is not null
- if (storeName != null) {
- final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true);
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic) {
+ return table(offsetReset, keySerde, valSerde, topic, (String) null);
+ }
+ /**
+ * Create a {@link KTable} for the specified topic.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * Note that the specified input topics must be partitioned by key.
+ * If this is not the case the returned {@link KTable} will be corrupted.
+ * <p>
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+ * @return a {@link KTable} for the specified topic
+ */
+ public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, true);
+ }
- addStateStore(storeSupplier, name);
- connectSourceStoreAndTopic(storeName, topic);
- }
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+ * {@code queryableStoreName}.
+ * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ...
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long valueForKey = localStore.get(key);
+ * }</pre>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(String)}
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
+ final String queryableStoreName) {
+ return globalTable(null, null, topic, queryableStoreName);
+ }
- return kTable;
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
+ return globalTable(null, null, topic, (String) null);
}
/**
@@ -411,7 +718,7 @@ public class KStreamBuilder extends TopologyBuilder {
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
+ * {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -419,20 +726,33 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
* @return a {@link GlobalKTable} for the specified topic
*/
- public <K, V> GlobalKTable<K, V> globalTable(final String topic,
- final String storeName) {
- return globalTable(null, null, topic, storeName);
+ @SuppressWarnings("unchecked")
+ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final String queryableStoreName) {
+ final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+ return doGlobalTable(keySerde, valSerde, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true));
}
/**
@@ -441,7 +761,7 @@ public class KStreamBuilder extends TopologyBuilder {
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
+ * {@code queryableStoreName}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -449,7 +769,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
@@ -461,31 +781,60 @@ public class KStreamBuilder extends TopologyBuilder {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
+ * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link GlobalKTable} for the specified topic
*/
@SuppressWarnings("unchecked")
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic,
- final String storeName) {
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return doGlobalTable(keySerde, valSerde, topic, storeSupplier);
+ }
+
+ private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
final String processorName = newName(KTableImpl.SOURCE_NAME);
- final KTableSource<K, V> tableSource = new KTableSource<>(storeName);
+ final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
- final StateStore store = new RocksDBKeyValueStoreSupplier<>(storeName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true).get();
+ addGlobalStore(storeSupplier, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+ return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+ }
- addGlobalStore(store, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
- return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeName));
+ /**
+ * Create a {@link GlobalKTable} for the specified topic.
+ * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+ * Input {@link KeyValue records} with {@code null} key will be dropped.
+ * <p>
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+ * store name. Note that that store name may not be queriable through Interactive Queries.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+ * <p>
+ * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+ * regardless of the specified value in {@link StreamsConfig}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @return a {@link GlobalKTable} for the specified topic
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic) {
+
+ return globalTable(keySerde, valSerde, topic, (String) null);
}
/**
@@ -512,4 +861,16 @@ public class KStreamBuilder extends TopologyBuilder {
return prefix + String.format("%010d", index.getAndIncrement());
}
+ /**
+ * <strong>This function is only for internal usage only and should not be called.</strong>
+ * <p>
+ * Create a unique state store name.
+ *
+ * @param prefix processor name prefix
+ * @return a new unique name
+ */
+ public String newStoreName(final String prefix) {
+ return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
+ }
+
}