You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/18 10:54:19 UTC
kafka git commit: KAFKA-5654;
add materialized count, reduce, aggregate to KGroupedStream
Repository: kafka
Updated Branches:
refs/heads/trunk 346d0ca53 -> d83252eba
KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream
Add overloads of `count`, `reduce`, and `aggregate` that are `Materialized` to `KGroupedStream`.
Refactor common parts between `KGroupedStream` and `WindowedKStream`
Author: Damian Guy <da...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3827 from dguy/kafka-5654
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d83252eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d83252eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d83252eb
Branch: refs/heads/trunk
Commit: d83252ebaeeca5bf19584908d95b424beb31b12e
Parents: 346d0ca
Author: Damian Guy <da...@gmail.com>
Authored: Mon Sep 18 11:54:14 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 18 11:54:14 2017 +0100
----------------------------------------------------------------------
.../kafka/streams/kstream/KGroupedStream.java | 210 ++++++++++++++++++-
.../GroupedStreamAggregateBuilder.java | 76 +++++++
.../kstream/internals/KGroupedStreamImpl.java | 127 +++++++----
.../streams/kstream/internals/KStreamImpl.java | 25 +--
.../kstream/internals/MaterializedInternal.java | 13 +-
.../kstream/internals/WindowedKStreamImpl.java | 57 ++---
.../internals/KGroupedStreamImplTest.java | 106 ++++++++++
7 files changed, 515 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index f12c2b2..08916ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -146,6 +147,38 @@ public interface KGroupedStream<K, V> {
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
+ * Count the number of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * <pre>{@code
+ * KafkaStreams streams = ... // counting words
+ * String queryableStoreName = "count-store"; // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-word";
+ * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+ * represent the latest (rolling) count (i.e., number of records) for each key
+ */
+ KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
+
+ /**
* Count the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
* The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
@@ -395,7 +428,7 @@ public interface KGroupedStream<K, V> {
* and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
*/
@@ -452,12 +485,14 @@ public interface KGroupedStream<K, V> {
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
- * @param reducer a {@link Reducer} that computes a new aggregate result
- * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+ * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprectated use {@link #reduce(Reducer, Materialized)}
*/
+ @Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
final String queryableStoreName);
@@ -507,15 +542,69 @@ public interface KGroupedStream<K, V> {
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprectated use {@link #reduce(Reducer, Materialized)}
*/
+ @Deprecated
KTable<K, V> reduce(final Reducer<V> reducer,
final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
+ * Combine the value of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Combining implies that the type of the aggregate result is the same as the type of the input value
+ * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * provided by the given {@code storeSupplier}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+ * aggregate (first argument) and the record's value (second argument):
+ * <pre>{@code
+ * // At the example of a Reducer<Long>
+ * new Reducer<Long>() {
+ * @Override
+ * public Long apply(Long aggValue, Long currValue) {
+ * return aggValue + currValue;
+ * }
+ * }</pre>
+ * <p>
+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+ * value as-is.
+ * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
+ * max.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+ * <pre>{@code
+ * KafkaStreams streams = ... // compute sum
+ * String queryableStoreName = "storeName" // the queryableStoreName should be the name of the store as defined by the Materialized instance
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ *
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ KTable<K, V> reduce(final Reducer<V> reducer,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+
+ /**
* Combine the number of records in this stream by the grouped key and the defined windows.
* Records with {@code null} key or value are ignored.
* Combining implies that the type of the aggregate result is the same as the type of the input value
@@ -678,7 +767,7 @@ public interface KGroupedStream<K, V> {
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param reducer a {@link Reducer} that computes a new aggregate result
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param windows the specification of the aggregation {@link Windows}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
@@ -743,7 +832,7 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
- * @param reducer the instance of {@link Reducer}
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
* alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
@@ -778,7 +867,7 @@ public interface KGroupedStream<K, V> {
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
- * @param reducer the instance of {@link Reducer}
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
@@ -841,7 +930,7 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
* provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
- * @param reducer the instance of {@link Reducer}
+ * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
@@ -905,7 +994,9 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
*/
+ @Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde,
@@ -935,6 +1026,105 @@ public interface KGroupedStream<K, V> {
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
* <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double
+ * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
+ * alphanumerics, '.', '_' and '-'.
+ * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+ * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+
+ /**
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count(String)}).
+ * <p>
+ * The default value serde from config will be used for serializing the result.
+ * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+ * and "-changelog" is a fixed suffix.
+ * Note that the internal store name may not be queriable through Interactive Queries.
+ * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param <VR> the value type of the resulting {@link KTable}
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+ * latest (rolling) aggregate for each key
+ */
+ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator);
+ /**
+ * Aggregate the values of records in this stream by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+ * allows the result to have a different type than the input values.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried using the provided {@code queryableStoreName}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * The specified {@link Initializer} is applied once directly before the first input record is processed to
+ * provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+ * aggregate (or for the very first record using the intermediate aggregation result provided via the
+ * {@link Initializer}) and the record's value.
+ * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+ * count (c.f. {@link #count(String)}).
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+ * the same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
@@ -950,7 +1140,9 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
*/
+ @Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde);
@@ -999,7 +1191,9 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
*/
+ @Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final StateStoreSupplier<KeyValueStore> storeSupplier);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
new file mode 100644
index 0000000..ef0cdfc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Collections;
+import java.util.Set;
+
+class GroupedStreamAggregateBuilder<K, V> {
+ private final InternalStreamsBuilder builder;
+ private final Serde<K> keySerde;
+ private final Serde<V> valueSerde;
+ private final boolean repartitionRequired;
+ private final Set<String> sourceNodes;
+ private final String name;
+
+ GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final boolean repartitionRequired,
+ final Set<String> sourceNodes,
+ final String name) {
+
+ this.builder = builder;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ this.repartitionRequired = repartitionRequired;
+ this.sourceNodes = sourceNodes;
+ this.name = name;
+ }
+
+ <T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+ final String functionName,
+ final StoreBuilder storeBuilder,
+ final boolean isQueryable) {
+ final String aggFunctionName = builder.newName(functionName);
+ final String sourceName = repartitionIfRequired(storeBuilder.name());
+ builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+ builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
+
+ return new KTableImpl<>(
+ builder,
+ aggFunctionName,
+ aggregateSupplier,
+ sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
+ storeBuilder.name(),
+ isQueryable);
+ }
+
+ /**
+ * @return the new sourceName if repartitioned. Otherwise the name of this stream
+ */
+ private String repartitionIfRequired(final String queryableStoreName) {
+ if (!repartitionRequired) {
+ return this.name;
+ }
+ return KStreamImpl.createReparitionedSource(builder, keySerde, valueSerde, queryableStoreName, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 57114b5..1fab2c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
@@ -32,6 +34,7 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Collections;
@@ -46,6 +49,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private final boolean repartitionRequired;
+ private final Initializer<Long> countInitializer = new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ };
+ private final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate + 1;
+ }
+ };
+ private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
private boolean isQueryable = true;
KGroupedStreamImpl(final InternalStreamsBuilder builder,
@@ -55,6 +71,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Serde<V> valSerde,
final boolean repartitionRequired) {
super(builder, name, sourceNodes);
+ this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder,
+ keySerde,
+ valSerde,
+ repartitionRequired,
+ sourceNodes,
+ name);
this.keySerde = keySerde;
this.valSerde = valSerde;
this.repartitionRequired = repartitionRequired;
@@ -91,6 +113,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
storeSupplier);
}
+ @Override
+ public KTable<K, V> reduce(final Reducer<V> reducer,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(reducer, "reducer can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized);
+ return doAggregate(
+ new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
+ REDUCE_NAME,
+ materializedInternal);
+ }
+
@Override
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -131,6 +166,41 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
@Override
+ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return aggregateMaterialized(initializer, aggregator, materialized);
+ }
+
+ private <VR> KTable<K, VR> aggregateMaterialized(final Initializer<VR> initializer,
+ final Aggregator<? super K, ? super V, VR> aggregator,
+ final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+ final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized);
+ return doAggregate(
+ new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
+ AGGREGATE_NAME,
+ materializedInternal);
+ }
+
+ @Override
+ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ final String storeName = builder.newStoreName(AGGREGATE_NAME);
+
+ MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+ new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false);
+ return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
+ AGGREGATE_NAME,
+ materializedInternal);
+
+ }
+
+ @Override
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Serde<T> aggValueSerde) {
@@ -198,17 +268,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
@Override
public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return aggregate(new Initializer<Long>() {
- @Override
- public Long apply() {
- return 0L;
- }
- }, new Aggregator<K, V, Long>() {
- @Override
- public Long apply(K aggKey, V value, Long aggregate) {
- return aggregate + 1;
- }
- }, storeSupplier);
+ return aggregate(countInitializer, countAggregator, storeSupplier);
+ }
+
+ @Override
+ public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+ return aggregate(countInitializer, countAggregator, materialized);
}
@Override
@@ -227,17 +292,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final StateStoreSupplier<WindowStore> storeSupplier) {
return aggregate(
- new Initializer<Long>() {
- @Override
- public Long apply() {
- return 0L;
- }
- }, new Aggregator<K, V, Long>() {
- @Override
- public Long apply(K aggKey, V value, Long aggregate) {
- return aggregate + 1;
- }
- },
+ countInitializer,
+ countAggregator,
windows,
storeSupplier);
}
@@ -320,18 +376,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final StateStoreSupplier<SessionStore> storeSupplier) {
Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- final Initializer<Long> initializer = new Initializer<Long>() {
- @Override
- public Long apply() {
- return 0L;
- }
- };
- final Aggregator<K, V, Long> aggregator = new Aggregator<K, V, Long>() {
- @Override
- public Long apply(final K aggKey, final V value, final Long aggregate) {
- return aggregate + 1;
- }
- };
final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
@Override
public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
@@ -339,7 +383,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
}
};
- return aggregate(initializer, aggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
+ return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
}
@@ -397,6 +441,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
return aggregate(initializer, aggregator, sessionMerger, sessionWindows, valSerde, storeSupplier);
}
+
+ private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+ final String functionName,
+ final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+
+ final StoreBuilder<KeyValueStore<K, T>> storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal)
+ .materialize();
+ return aggregateBuilder.build(aggregateSupplier, functionName, storeBuilder, materializedInternal.isQueryable());
+
+ }
+
private <T> KTable<K, T> doAggregate(
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
final String functionName,
@@ -426,6 +481,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
if (!repartitionRequired) {
return this.name;
}
- return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
+ return KStreamImpl.createReparitionedSource(builder, keySerde, valSerde, queryableStoreName, name);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7201a00..6ebbd14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -605,37 +605,38 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
*/
private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
final Serde<V> valSerde) {
- String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, null);
+ String repartitionedSourceName = createReparitionedSource(builder, keySerde, valSerde, null, name);
return new KStreamImpl<>(builder, repartitionedSourceName, Collections
.singleton(repartitionedSourceName), false);
}
- static <K1, V1> String createReparitionedSource(final AbstractStream<K1> stream,
+ static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
final Serde<K1> keySerde,
final Serde<V1> valSerde,
- final String topicNamePrefix) {
+ final String topicNamePrefix,
+ final String name) {
Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
- String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;
+ String baseName = topicNamePrefix != null ? topicNamePrefix : name;
String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
- String sinkName = stream.builder.newName(SINK_NAME);
- String filterName = stream.builder.newName(FILTER_NAME);
- String sourceName = stream.builder.newName(SOURCE_NAME);
+ String sinkName = builder.newName(SINK_NAME);
+ String filterName = builder.newName(FILTER_NAME);
+ String sourceName = builder.newName(SOURCE_NAME);
- stream.builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
- stream.builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
+ builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
+ builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
@Override
public boolean test(final K1 key, final V1 value) {
return key != null;
}
- }, false), stream.name);
+ }, false), name);
- stream.builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
+ builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
null, filterName);
- stream.builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
+ builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
keyDeserializer, valDeserializer, repartitionTopic);
return sourceName;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index d7ebc65..0ee610f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -25,8 +25,15 @@ import java.util.Map;
public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
+ private final boolean queryable;
+
public MaterializedInternal(final Materialized<K, V, S> materialized) {
+ this(materialized, true);
+ }
+
+ MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) {
super(materialized);
+ this.queryable = queryable;
}
public String storeName() {
@@ -56,7 +63,11 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
return topicConfig;
}
- public boolean cachingEnabled() {
+ boolean cachingEnabled() {
return cachingEnabled;
}
+
+ public boolean isQueryable() {
+ return queryable;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
index 28666b8..b6e38f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
-import java.util.Collections;
import java.util.Objects;
import java.util.Set;
@@ -40,9 +39,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDU
public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
private final Windows<W> windows;
- private final boolean repartitionRequired;
private final Serde<K> keySerde;
private final Serde<V> valSerde;
+ private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
WindowedKStreamImpl(final Windows<W> windows,
final InternalStreamsBuilder builder,
@@ -55,8 +54,8 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
Objects.requireNonNull(windows, "windows can't be null");
this.valSerde = valSerde;
this.keySerde = keySerde;
- this.repartitionRequired = repartitionRequired;
this.windows = windows;
+ this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
}
@Override
@@ -76,38 +75,34 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
Serdes.Long());
}
+ @SuppressWarnings("unchecked")
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
- final String aggFunctionName = builder.newName(AGGREGATE_NAME);
final String storeName = builder.newStoreName(AGGREGATE_NAME);
- return doAggregate(aggValueSerde,
- aggFunctionName,
- storeName,
- new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator));
+ return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
+ AGGREGATE_NAME,
+ windowStoreBuilder(storeName, aggValueSerde),
+ false);
}
+ @SuppressWarnings("unchecked")
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
Objects.requireNonNull(reducer, "reducer can't be null");
final String storeName = builder.newStoreName(REDUCE_NAME);
- return doAggregate(valSerde,
- builder.newName(REDUCE_NAME),
- storeName,
- new KStreamWindowReduce<>(windows, storeName, reducer));
+ return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
+ REDUCE_NAME,
+ windowStoreBuilder(storeName, valSerde),
+ false);
}
- @SuppressWarnings("unchecked")
- private <VR> KTable<Windowed<K>, VR> doAggregate(final Serde<VR> aggValueSerde,
- final String aggFunctionName,
- final String storeName,
- final KStreamAggProcessorSupplier aggSupplier) {
- final String sourceName = repartitionIfRequired(storeName);
- final StoreBuilder<WindowStore<K, VR>> storeBuilder = Stores.windowStoreBuilder(
+ private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+ return Stores.windowStoreBuilder(
Stores.persistentWindowStore(
storeName,
windows.maintainMs(),
@@ -115,29 +110,7 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
windows.size(),
false),
keySerde,
- aggValueSerde)
- .withCachingEnabled();
-
- builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName);
- builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
-
- return new KTableImpl<>(
- builder,
- aggFunctionName,
- aggSupplier,
- sourceName.equals(this.name) ? sourceNodes
- : Collections.singleton(sourceName),
- storeName,
- false);
+ aggValueSerde).withCachingEnabled();
}
- /**
- * @return the new sourceName if repartitioned. Otherwise the name of this stream
- */
- private String repartitionIfRequired(final String queryableStoreName) {
- if (!repartitionRequired) {
- return this.name;
- }
- return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index bc65e09..efa027c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
@@ -486,6 +488,110 @@ public class KGroupedStreamImplTest {
groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
}
+ @SuppressWarnings("unchecked")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
+ groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
+ groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
+ groupedStream.count((Materialized) null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldCountAndMaterializeResults() {
+ groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()));
+
+ processData();
+
+ final KeyValueStore<String, Long> count = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+
+ assertThat(count.get("1"), equalTo(3L));
+ assertThat(count.get("2"), equalTo(1L));
+ assertThat(count.get("3"), equalTo(2L));
+ }
+
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReduceAndMaterializeResults() {
+ groupedStream.reduce(MockReducer.STRING_ADDER,
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ processData();
+
+ final KeyValueStore<String, String> reduced = (KeyValueStore<String, String>) driver.allStateStores().get("reduce");
+
+ assertThat(reduced.get("1"), equalTo("A+C+D"));
+ assertThat(reduced.get("2"), equalTo("B"));
+ assertThat(reduced.get("3"), equalTo("E+F"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldAggregateAndMaterializeResults() {
+ groupedStream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String()));
+
+ processData();
+
+ final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+
+ assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
+ assertThat(aggregate.get("2"), equalTo("0+B"));
+ assertThat(aggregate.get("3"), equalTo("0+E+F"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldAggregateWithDefaultSerdes() {
+ final Map<String, String> results = new HashMap<>();
+ groupedStream.aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER)
+ .toStream()
+ .foreach(new ForeachAction<String, String>() {
+ @Override
+ public void apply(final String key, final String value) {
+ results.put(key, value);
+ }
+ });
+
+ processData();
+
+ assertThat(results.get("1"), equalTo("0+A+C+D"));
+ assertThat(results.get("2"), equalTo("0+B"));
+ assertThat(results.get("3"), equalTo("0+E+F"));
+ }
+
+ private void processData() {
+ driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+ driver.setTime(0);
+ driver.process(TOPIC, "1", "A");
+ driver.process(TOPIC, "2", "B");
+ driver.process(TOPIC, "1", "C");
+ driver.process(TOPIC, "1", "D");
+ driver.process(TOPIC, "3", "E");
+ driver.process(TOPIC, "3", "F");
+ driver.flushState();
+ }
+
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
driver.setUp(builder, TestUtils.tempDirectory(), 0);
driver.setTime(0);