You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/12/01 03:37:32 UTC
[kafka] branch trunk updated: KAFKA-6049: Add non-windowed Cogroup
operator (KIP-150) (#7538)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0b8ea7e KAFKA-6049: Add non-windowed Cogroup operator (KIP-150) (#7538)
0b8ea7e is described below
commit 0b8ea7e162e68662fbda6893d188862999d9b427
Author: wcarlson5 <18...@users.noreply.github.com>
AuthorDate: Sat Nov 30 19:37:04 2019 -0800
KAFKA-6049: Add non-windowed Cogroup operator (KIP-150) (#7538)
Reviewer: Matthias J. Sax <ma...@confluent.io>
---
.../kafka/streams/kstream/CogroupedKStream.java | 268 ++++++++++
.../kafka/streams/kstream/KGroupedStream.java | 18 +
.../org/apache/kafka/streams/kstream/KStream.java | 6 +
.../streams/kstream/internals/AbstractStream.java | 10 +-
.../kstream/internals/CogroupedKStreamImpl.java | 107 ++++
.../internals/CogroupedStreamAggregateBuilder.java | 146 ++++++
.../kstream/internals/KGroupedStreamImpl.java | 8 +
.../streams/kstream/internals/KStreamImpl.java | 8 +-
.../streams/kstream/internals/KStreamImplJoin.java | 2 +-
.../streams/kstream/internals/KTableImpl.java | 2 +-
.../{KStreamPassThrough.java => PassThrough.java} | 6 +-
.../internals/CogroupedKStreamImplTest.java | 551 +++++++++++++++++++++
.../kstream/internals/KGroupedStreamImplTest.java | 10 +-
13 files changed, 1124 insertions(+), 18 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
new file mode 100644
index 0000000..a9b66d8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+/**
+ * {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
+ * <p>
+ * It is an intermediate representation after a grouping of {@link KStream}s, before the
+ * aggregations are applied to the new partitions resulting in a {@link KTable}.
+ * <p>
+ * A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via
+ * {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
+ *
+ * @param <K> Type of keys
+ * @param <VOut> Type of values after agg
+ */
+public interface CogroupedKStream<K, VOut> {
+
+ /**
+ * Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}.
+ * <p>
+ * The added {@link KGroupedStream grouped KStream} must have the same number of partitions as all existing
+ * streams of this {@code CogroupedKStream}.
+ * If this is not the case, you would need to call {@link KStream#through(String)} before
+ * {@link KStream#groupByKey() grouping} the {@link KStream}, using a pre-created topic with the "correct" number of
+ * partitions.
+ * <p>
+ * The specified {@link Aggregator} is applied in the actual {@link #aggregate(Initializer) aggregation} step for
+ * each input record and computes a new aggregate using the current aggregate (or for the very first record per key
+ * using the initial intermediate aggregation result provided via the {@link Initializer} that is passed into
+ * {@link #aggregate(Initializer)}) and the record's value.
+ *
+ * @param groupedStream a group stream
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param <VIn> Type of input values
+ * @return a {@code CogroupedKStream}
+ */
+ <VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn> groupedStream,
+ final Aggregator<? super K, ? super VIn, VOut> aggregator);
+
+ /**
+ * Aggregate the values of records in these streams by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried by the given store name in {@code materialized}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * To compute the aggregation the corresponding {@link Aggregator} as specified in
+ * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+ * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+ * same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double
+ * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+ * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+ * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is a generated value, and
+ * "-changelog" is a fixed suffix.
+ * <p>
+ * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation
+ * result. Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+ * represent the latest (rolling) aggregate for each key
+ */
+ KTable<K, VOut> aggregate(final Initializer<VOut> initializer);
+
+ /**
+ * Aggregate the values of records in these streams by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried by the given store name in {@code materialized}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * To compute the aggregation the corresponding {@link Aggregator} as specified in
+ * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+ * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Named} is applied once to the processor combining the grouped streams.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+ * same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>{@code
+ * KafkaStreams streams = ... // some aggregation on value type double
+ * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+ * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+ * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the provide store name defined
+ * in {@code Materialized}, and "-changelog" is a fixed suffix.
+ * <p>
+ * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation
+ * result. Cannot be {@code null}.
+ * @param named name the processor. Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+ * represent the latest (rolling) aggregate for each key
+ */
+ KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ final Named named);
+
+ /**
+ * Aggregate the values of records in these streams by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried by the given store name in {@code materialized}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * To compute the aggregation the corresponding {@link Aggregator} as specified in
+ * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+ * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+ * same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>
+ * KafkaStreams streams = ... // some aggregation on value type double
+ * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+ * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+ * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the provide store name defined
+ * in {@code Materialized}, and "-changelog" is a fixed suffix.
+ * <p>
+ * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation
+ * result. Cannot be {@code null}.
+ * @param materialized an instance of {@link Materialized} used to materialize a state store.
+ * Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+ * represent the latest (rolling) aggregate for each key
+ */
+ KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
+
+ /**
+ * Aggregate the values of records in these streams by the grouped key.
+ * Records with {@code null} key or value are ignored.
+ * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+ * that can be queried by the given store name in {@code materialized}.
+ * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+ * <p>
+ * To compute the aggregation the corresponding {@link Aggregator} as specified in
+ * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+ * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+ * processed to provide an initial intermediate aggregation result that is used to process the first record.
+ * The specified {@link Named} is used to name the processor combining the grouped streams.
+ * <p>
+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+ * same key.
+ * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+ * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+ * <p>
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * <pre>
+ * KafkaStreams streams = ... // some aggregation on value type double
+ * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+ * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * String key = "some-key";
+ * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }</pre>
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+ * the value of the key on a parallel running instance of your Kafka Streams application.
+ * <p>
+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+ * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+ * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+ * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+ * user-specified in {@link StreamsConfig} via parameter
+ * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the provide store name defined
+ * in {@code Materialized}, and "-changelog" is a fixed suffix.
+ * <p>
+ * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+ *
+ * @param initializer an {@link Initializer} that computes an initial intermediate aggregation
+ * result. Cannot be {@code null}.
+ * @param materialized an instance of {@link Materialized} used to materialize a state store.
+ * Cannot be {@code null}.
+ * @param named name the processors. Cannot be {@code null}.
+ * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+ * represent the latest (rolling) aggregate for each key
+ */
+ KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ final Named named,
+ final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 6b3c423..3f4e845 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -535,4 +535,22 @@ public interface KGroupedStream<K, V> {
*/
SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows);
+ /**
+ * Create a new {@link CogroupedKStream} from the this grouped KStream to allow cogrouping other
+ * {@code KGroupedStream} to it.
+ * {@link CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
+ * It is an intermediate representation after a grouping of {@link KStream}s, before the
+ * aggregations are applied to the new partitions resulting in a {@link KTable}.
+ * <p>
+ * The specified {@link Aggregator} is applied in the actual {@link CogroupedKStream#aggregate(Initializer)
+ * aggregation} step for each input record and computes a new aggregate using the current aggregate (or for the very
+ * first record per key using the initial intermediate aggregation result provided via the {@link Initializer} that
+ * is passed into {@link CogroupedKStream#aggregate(Initializer)}) and the record's value.
+ *
+ * @param aggregator an {@link Aggregator} that computes a new aggregate result
+ * @param <Vout> the type of the output values
+ * @return a {@link CogroupedKStream}
+ */
+ <Vout> CogroupedKStream<K, Vout> cogroup(final Aggregator<? super K, ? super V, Vout> aggregator);
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 255ccb7..8a602c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -2065,6 +2065,7 @@ public interface KStream<K, V> {
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and default serializers and deserializers.
+ * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
@@ -2094,6 +2095,7 @@ public interface KStream<K, V> {
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and using the serializers as defined by {@link Serialized}.
+ * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
@@ -2124,6 +2126,7 @@ public interface KStream<K, V> {
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and using the serializers as defined by {@link Grouped}.
+ * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
@@ -2154,6 +2157,7 @@ public interface KStream<K, V> {
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and default serializers and deserializers.
+ * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
@@ -2183,6 +2187,7 @@ public interface KStream<K, V> {
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Serialized}.
+ * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
@@ -2215,6 +2220,7 @@ public interface KStream<K, V> {
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Grouped}.
+ * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index f087b79..736d68f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import java.util.Collection;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -84,11 +85,12 @@ public abstract class AbstractStream<K, V> {
return builder.internalTopologyBuilder;
}
- Set<String> ensureJoinableWith(final AbstractStream<K, ?> other) {
- final Set<String> allSourceNodes = new HashSet<>();
+ Set<String> ensureCopartitionWith(final Collection<? extends AbstractStream<K, ?>> otherStreams) {
+ final Set<String> allSourceNodes = new HashSet<>(sourceNodes);
allSourceNodes.addAll(sourceNodes);
- allSourceNodes.addAll(other.sourceNodes);
-
+ for (final AbstractStream<K, ?> other: otherStreams) {
+ allSourceNodes.addAll(other.sourceNodes);
+ }
builder.internalTopologyBuilder.copartitionSources(allSourceNodes);
return allSourceNodes;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
new file mode 100644
index 0000000..959d26e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> implements CogroupedKStream<K, VOut> {
+
+ static final String AGGREGATE_NAME = "COGROUPKSTREAM-AGGREGATE-";
+ static final String MERGE_NAME = "COGROUPKSTREAM-MERGE-";
+
+ final private Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns;
+ final private CogroupedStreamAggregateBuilder<K, VOut> aggregateBuilder;
+
+ CogroupedKStreamImpl(final String name,
+ final Set<String> sourceNodes,
+ final StreamsGraphNode streamsGraphNode,
+ final InternalStreamsBuilder builder) {
+ super(name, null, null, sourceNodes, streamsGraphNode, builder);
+ groupPatterns = new LinkedHashMap<>();
+ aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn> groupedStream,
+ final Aggregator<? super K, ? super VIn, VOut> aggregator) {
+ Objects.requireNonNull(groupedStream, "groupedStream can't be null");
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ groupPatterns.put((KGroupedStreamImpl<K, ?>) groupedStream,
+ (Aggregator<? super K, ? super Object, VOut>) aggregator);
+ return this;
+ }
+
+ @Override
+ public KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized) {
+ return aggregate(initializer, NamedInternal.empty(), materialized);
+ }
+
+ @Override
+ public KTable<K, VOut> aggregate(final Initializer<VOut> initializer, final Named named) {
+ return aggregate(initializer, named, Materialized.with(keySerde, null));
+ }
+
+ @Override
+ public KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+ final Named named,
+ final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(initializer, "initializer can't be null");
+ Objects.requireNonNull(named, "named can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return doAggregate(
+ initializer,
+ new NamedInternal(named),
+ new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME));
+ }
+
+ @Override
+ public KTable<K, VOut> aggregate(final Initializer<VOut> initializer) {
+ return aggregate(initializer, Materialized.with(keySerde, null));
+ }
+
+ private KTable<K, VOut> doAggregate(final Initializer<VOut> initializer,
+ final NamedInternal named,
+ final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+ return aggregateBuilder.build(
+ groupPatterns,
+ initializer,
+ named,
+ new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
+ materializedInternal.keySerde(),
+ materializedInternal.valueSerde(),
+ materializedInternal.queryableStoreName(),
+ null,
+ null,
+ null);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
new file mode 100644
index 0000000..a3f2f1d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class CogroupedStreamAggregateBuilder<K, VOut> {
+ private final InternalStreamsBuilder builder;
+
+ CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
+ this.builder = builder;
+ }
+
+ <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+ final Initializer<VOut> initializer,
+ final NamedInternal named,
+ final StoreBuilder<? extends StateStore> storeBuilder,
+ final Serde<KR> keySerde,
+ final Serde<VOut> valSerde,
+ final String queryableName,
+ final Windows<W> windows,
+ final SessionWindows sessionWindows,
+ final Merger<? super K, VOut> sessionMerger) {
+
+ final Collection<? extends AbstractStream<K, ?>> groupedStreams = new ArrayList<>(groupPatterns.keySet());
+ final AbstractStream<K, ?> kGrouped = groupedStreams.iterator().next();
+ groupedStreams.remove(kGrouped);
+ kGrouped.ensureCopartitionWith(groupedStreams);
+
+ final Collection<StreamsGraphNode> processors = new ArrayList<>();
+ boolean stateCreated = false;
+ int counter = 0;
+ for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+ final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(
+ kGroupedStream.getValue(),
+ initializer,
+ named.suffixWithOrElseGet(
+ "-cogroup-agg-" + counter++,
+ builder,
+ CogroupedKStreamImpl.AGGREGATE_NAME),
+ stateCreated,
+ storeBuilder,
+ windows,
+ sessionWindows,
+ sessionMerger);
+ stateCreated = true;
+ processors.add(statefulProcessorNode);
+ builder.addGraphNode(kGroupedStream.getKey().streamsGraphNode, statefulProcessorNode);
+ }
+ final String mergeProcessorName = named.suffixWithOrElseGet(
+ "-cogroup-merge",
+ builder,
+ CogroupedKStreamImpl.MERGE_NAME);
+ final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
+ final ProcessorGraphNode<K, VOut> mergeNode =
+ new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
+
+ builder.addGraphNode(processors, mergeNode);
+
+ return new KTableImpl<KR, VIn, VOut>(
+ mergeProcessorName,
+ keySerde,
+ valSerde,
+ Collections.singleton(mergeNode.nodeName()),
+ queryableName,
+ passThrough,
+ mergeNode,
+ builder);
+ }
+
+ private <W extends Window> StatefulProcessorNode getStatefulProcessorNode(final Aggregator<? super K, ? super Object, VOut> aggregator,
+ final Initializer<VOut> initializer,
+ final String processorName,
+ final boolean stateCreated,
+ final StoreBuilder<? extends StateStore> storeBuilder,
+ final Windows<W> windows,
+ final SessionWindows sessionWindows,
+ final Merger<? super K, VOut> sessionMerger) {
+
+ final ProcessorSupplier<K, ?> kStreamAggregate;
+
+ if (windows == null && sessionWindows == null) {
+ kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator);
+ } else if (windows != null && sessionWindows == null) {
+ kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator);
+ } else if (windows == null && sessionMerger != null) {
+ kStreamAggregate = new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, aggregator, sessionMerger);
+ } else {
+ throw new IllegalArgumentException("must include windows OR sessionWindows + sessionMerger OR all must be null");
+ }
+
+ final StatefulProcessorNode<K, ?> statefulProcessorNode;
+ if (!stateCreated) {
+ statefulProcessorNode =
+ new StatefulProcessorNode<>(
+ processorName,
+ new ProcessorParameters<>(kStreamAggregate, processorName),
+ storeBuilder
+ );
+ } else {
+ statefulProcessorNode =
+ new StatefulProcessorNode<>(
+ processorName,
+ new ProcessorParameters<>(kStreamAggregate, processorName),
+ new String[]{storeBuilder.name()}
+ );
+ }
+ return statefulProcessorNode;
+ }
+
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 5751105..7884f30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -223,4 +224,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
materializedInternal.keySerde(),
materializedInternal.valueSerde());
}
+
+ @Override
+ public <Vout> CogroupedKStream<K, Vout> cogroup(final Aggregator<? super K, ? super V, Vout> aggregator) {
+ Objects.requireNonNull(aggregator, "aggregator can't be null");
+ return new CogroupedKStreamImpl<K, Vout>(name, sourceNodes, streamsGraphNode, builder)
+ .cogroup(this, aggregator);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bca4942..870cf53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -368,7 +368,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
for (int i = 0; i < predicates.length; i++) {
- final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
+ final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new PassThrough<K, V>(), childNames[i]);
final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters);
builder.addGraphNode(branchNode, branchChildNode);
@@ -401,7 +401,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
allSourceNodes.addAll(sourceNodes);
allSourceNodes.addAll(streamImpl.sourceNodes);
- final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
+ final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new PassThrough<>(), name);
final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name, processorParameters);
mergeNode.setMergeNode(true);
@@ -796,7 +796,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
}
- joinThis.ensureJoinableWith(joinOther);
+ joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
return join.join(
joinThis,
@@ -1037,7 +1037,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(other, "other KTable can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
+ final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) other));
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index b77e064..9affbe9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -133,7 +133,7 @@ class KStreamImplJoin {
rightOuter
);
- final KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
+ final PassThrough<K1, R> joinMerge = new PassThrough<>();
final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K1, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f6f2ada..052a5f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -677,7 +677,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final NamedInternal renamed = new NamedInternal(joinName);
final String joinMergeName = renamed.orElseGenerateWithPrefix(builder, MERGE_NAME);
- final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
+ final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) other));
if (leftOuter) {
enableSendingOldValues();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
similarity index 84%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
index 2afe507..b83b3a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
@@ -20,14 +20,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-class KStreamPassThrough<K, V> implements ProcessorSupplier<K, V> {
+class PassThrough<K, V> implements ProcessorSupplier<K, V> {
@Override
public Processor<K, V> get() {
- return new KStreamPassThroughProcessor<>();
+ return new PassThroughProcessor<>();
}
- private static final class KStreamPassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
+ private static final class PassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
@Override
public void process(final K key, final V value) {
context().forward(key, value);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
new file mode 100644
index 0000000..ad16879
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class CogroupedKStreamImplTest {
+ private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
+ private static final String TOPIC = "topic";
+ private static final String OUTPUT = "output";
+ private final StreamsBuilder builder = new StreamsBuilder();
+ private KGroupedStream<String, String> groupedStream;
+ private CogroupedKStream<String, String> cogroupedStream;
+
+ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+
+ private static final Aggregator<String, String, String> STRING_AGGREGATOR =
+ (key, value, aggregate) -> aggregate + value;
+
+ private static final Initializer<String> STRING_INITIALIZER = () -> "";
+
+ private static final Aggregator<String, String, Integer> STRING_SUM_AGGREGATOR =
+ (key, value, aggregate) -> aggregate + Integer.parseInt(value);
+
+ private static final Aggregator<? super String, ? super Integer, Integer> SUM_AGGREGATOR =
+ (key, value, aggregate) -> aggregate + value;
+
+ private static final Initializer<Integer> SUM_INITIALIZER = () -> 0;
+
+
+ @Before
+ public void setup() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+ groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+ cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNPEInCogroupIfKGroupedStreamIsNull() {
+ cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullAggregatorOnCogroup() {
+ cogroupedStream.cogroup(groupedStream, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullInitializerOnAggregate() {
+ cogroupedStream.aggregate(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullInitializerOnAggregateWitNamed() {
+ cogroupedStream.aggregate(null, Named.as("name"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullInitializerOnAggregateWitMaterialized() {
+ cogroupedStream.aggregate(null, Materialized.as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullInitializerOnAggregateWitNamedAndMaterialized() {
+ cogroupedStream.aggregate(null, Named.as("name"), Materialized.as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullNamedOnAggregate() {
+ cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullMaterializedOnAggregate() {
+ cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullNamedOnAggregateWithMateriazlied() {
+ cogroupedStream.aggregate(STRING_INITIALIZER, null, Materialized.as("store"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullMaterializedOnAggregateWithNames() {
+ cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), null);
+ }
+
+ @Test
+ public void shouldNameProcessorsAndStoreBasedOnNamedParameter() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, String> test2 = builder.stream("two", stringConsumed);
+
+ final KGroupedStream<String, String> groupedOne = stream1.groupByKey();
+ final KGroupedStream<String, String> groupedTwo = test2.groupByKey();
+
+ final KTable<String, String> customers = groupedOne
+ .cogroup(STRING_AGGREGATOR)
+ .cogroup(groupedTwo, STRING_AGGREGATOR)
+ .aggregate(STRING_INITIALIZER, Named.as("test"), Materialized.as("store"));
+
+ customers.toStream().to(OUTPUT);
+
+ final String topologyDescription = builder.build().describe().toString();
+
+ assertThat(
+ topologyDescription,
+ equalTo("Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n" +
+ " --> test-cogroup-agg-0\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n" +
+ " --> test-cogroup-agg-1\n" +
+ " Processor: test-cogroup-agg-0 (stores: [store])\n" +
+ " --> test-cogroup-merge\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: test-cogroup-agg-1 (stores: [store])\n" +
+ " --> test-cogroup-merge\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: test-cogroup-merge (stores: [])\n" +
+ " --> KTABLE-TOSTREAM-0000000005\n" +
+ " <-- test-cogroup-agg-0, test-cogroup-agg-1\n" +
+ " Processor: KTABLE-TOSTREAM-0000000005 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000006\n" +
+ " <-- test-cogroup-merge\n" +
+ " Sink: KSTREAM-SINK-0000000006 (topic: output)\n" +
+ " <-- KTABLE-TOSTREAM-0000000005\n\n"));
+ }
+
+ @Test
+ public void shouldCogroupAndAggregateSingleKStreams() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+
+ final KTable<String, String> customers = grouped1
+ .cogroup(STRING_AGGREGATOR)
+ .aggregate(STRING_INITIALIZER);
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic =
+ driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> testOutputTopic =
+ driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+ testInputTopic.pipeInput("k1", "A", 0);
+ testInputTopic.pipeInput("k2", "B", 0);
+ testInputTopic.pipeInput("k2", "B", 0);
+ testInputTopic.pipeInput("k1", "A", 0);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "B", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BB", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 0);
+ }
+ }
+
+ @Test
+ public void testCogroupHandleNullValues() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+
+ final KTable<String, String> customers = grouped1
+ .cogroup(STRING_AGGREGATOR)
+ .aggregate(STRING_INITIALIZER);
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic = driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> testOutputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+ testInputTopic.pipeInput("k1", "A", 0);
+ testInputTopic.pipeInput("k2", "B", 0);
+ testInputTopic.pipeInput("k2", null, 0);
+ testInputTopic.pipeInput("k2", "B", 0);
+ testInputTopic.pipeInput("k1", "A", 0);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "B", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BB", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 0);
+ }
+ }
+
+ @Test
+ public void shouldCogroupAndAggregateTwoKStreamsWithDistinctKeys() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+ final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+ final KTable<String, String> customers = grouped1
+ .cogroup(STRING_AGGREGATOR)
+ .cogroup(grouped2, STRING_AGGREGATOR)
+ .aggregate(STRING_INITIALIZER);
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic =
+ driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> testInputTopic2 =
+ driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> testOutputTopic =
+ driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+ testInputTopic.pipeInput("k1", "A", 0);
+ testInputTopic.pipeInput("k1", "A", 1);
+ testInputTopic.pipeInput("k1", "A", 10);
+ testInputTopic.pipeInput("k1", "A", 100);
+ testInputTopic2.pipeInput("k2", "B", 100L);
+ testInputTopic2.pipeInput("k2", "B", 200L);
+ testInputTopic2.pipeInput("k2", "B", 1L);
+ testInputTopic2.pipeInput("k2", "B", 500L);
+ testInputTopic2.pipeInput("k2", "B", 500L);
+ testInputTopic2.pipeInput("k2", "B", 100L);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 1);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAA", 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAAA", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "B", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BB", 200);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBB", 200);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBBB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBBBB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBBBBB", 500);
+ }
+ }
+
+ @Test
+ public void shouldCogroupAndAggregateTwoKStreamsWithSharedKeys() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+ final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+ final KTable<String, String> customers = grouped1
+ .cogroup(STRING_AGGREGATOR)
+ .cogroup(grouped2, STRING_AGGREGATOR)
+ .aggregate(STRING_INITIALIZER);
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic =
+ driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> testInputTopic2 =
+ driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> testOutputTopic =
+ driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+ testInputTopic.pipeInput("k1", "A", 0L);
+ testInputTopic.pipeInput("k2", "A", 1L);
+ testInputTopic.pipeInput("k1", "A", 10L);
+ testInputTopic.pipeInput("k2", "A", 100L);
+ testInputTopic2.pipeInput("k2", "B", 100L);
+ testInputTopic2.pipeInput("k2", "B", 200L);
+ testInputTopic2.pipeInput("k1", "B", 1L);
+ testInputTopic2.pipeInput("k2", "B", 500L);
+ testInputTopic2.pipeInput("k1", "B", 500L);
+ testInputTopic2.pipeInput("k2", "B", 500L);
+ testInputTopic2.pipeInput("k3", "B", 500L);
+ testInputTopic2.pipeInput("k2", "B", 100L);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "A", 1);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AA", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AAB", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABB", 200);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAB", 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AABB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBBB", 500);
+ }
+ }
+
+ @Test
+ public void shouldAllowDifferentOutputTypeInCoGroup() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+ final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+ final KTable<String, Integer> customers = grouped1
+ .cogroup(STRING_SUM_AGGREGATOR)
+ .cogroup(grouped2, STRING_SUM_AGGREGATOR)
+ .aggregate(
+ SUM_INITIALIZER,
+ Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("store1")
+ .withValueSerde(Serdes.Integer()));
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic =
+ driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> testInputTopic2 =
+ driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, Integer> testOutputTopic =
+ driver.createOutputTopic(OUTPUT, new StringDeserializer(), new IntegerDeserializer());
+
+ testInputTopic.pipeInput("k1", "1", 0L);
+ testInputTopic.pipeInput("k2", "1", 1L);
+ testInputTopic.pipeInput("k1", "1", 10L);
+ testInputTopic.pipeInput("k2", "1", 100L);
+ testInputTopic2.pipeInput("k2", "2", 100L);
+ testInputTopic2.pipeInput("k2", "2", 200L);
+ testInputTopic2.pipeInput("k1", "2", 1L);
+ testInputTopic2.pipeInput("k2", "2", 500L);
+ testInputTopic2.pipeInput("k1", "2", 500L);
+ testInputTopic2.pipeInput("k2", "3", 500L);
+ testInputTopic2.pipeInput("k3", "2", 500L);
+ testInputTopic2.pipeInput("k2", "2", 100L);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 1, 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 1, 1);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 2, 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 2, 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 4, 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 6, 200);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 4, 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 8, 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 6, 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 11, 500);
+ }
+ }
+
+ @Test
+ public void shouldCoGroupStreamsWithDifferentInputTypes() {
+ final Consumed<String, Integer> integerConsumed = Consumed.with(Serdes.String(), Serdes.Integer());
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, Integer> stream2 = builder.stream("two", integerConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+ final KGroupedStream<String, Integer> grouped2 = stream2.groupByKey();
+
+ final KTable<String, Integer> customers = grouped1
+ .cogroup(STRING_SUM_AGGREGATOR)
+ .cogroup(grouped2, SUM_AGGREGATOR)
+ .aggregate(
+ SUM_INITIALIZER,
+ Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("store1")
+ .withValueSerde(Serdes.Integer()));
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic = driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, Integer> testInputTopic2 = driver.createInputTopic("two", new StringSerializer(), new IntegerSerializer());
+ final TestOutputTopic<String, Integer> testOutputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new IntegerDeserializer());
+ testInputTopic.pipeInput("k1", "1", 0L);
+ testInputTopic.pipeInput("k2", "1", 1L);
+ testInputTopic.pipeInput("k1", "1", 10L);
+ testInputTopic.pipeInput("k2", "1", 100L);
+
+ testInputTopic2.pipeInput("k2", 2, 100L);
+ testInputTopic2.pipeInput("k2", 2, 200L);
+ testInputTopic2.pipeInput("k1", 2, 1L);
+ testInputTopic2.pipeInput("k2", 2, 500L);
+ testInputTopic2.pipeInput("k1", 2, 500L);
+ testInputTopic2.pipeInput("k2", 3, 500L);
+ testInputTopic2.pipeInput("k3", 2, 500L);
+ testInputTopic2.pipeInput("k2", 2, 100L);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 1, 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 1, 1);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 2, 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 2, 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 4, 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 6, 200);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 4, 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 8, 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", 6, 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", 11, 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k3", 2, 500);
+ }
+ }
+
+ @Test
+ public void testCogroupKeyMixedAggregators() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+ final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+ final KTable<String, String> customers = grouped1
+ .cogroup(MockAggregator.TOSTRING_REMOVER)
+ .cogroup(grouped2, MockAggregator.TOSTRING_ADDER)
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store1")
+ .withValueSerde(Serdes.String()));
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic =
+ driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> testInputTopic2 =
+ driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+ final TestOutputTopic<String, String> testOutputTopic =
+ driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+ testInputTopic.pipeInput("k1", "1", 0L);
+ testInputTopic.pipeInput("k2", "1", 1L);
+ testInputTopic.pipeInput("k1", "1", 10L);
+ testInputTopic.pipeInput("k2", "1", 100L);
+ testInputTopic2.pipeInput("k1", "2", 500L);
+ testInputTopic2.pipeInput("k2", "2", 500L);
+ testInputTopic2.pipeInput("k1", "2", 500L);
+ testInputTopic2.pipeInput("k2", "2", 100L);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1", 1);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1-1", 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1-1", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1-1+2", 500L);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1-1+2", 500L);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1-1+2+2", 500L);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1-1+2+2", 500L);
+ }
+ }
+
+ @Test
+ public void testCogroupWithThreeGroupedStreams() {
+ final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+ final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+ final KStream<String, String> stream3 = builder.stream("three", stringConsumed);
+
+ final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+ final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+ final KGroupedStream<String, String> grouped3 = stream3.groupByKey();
+
+ final KTable<String, String> customers = grouped1
+ .cogroup(STRING_AGGREGATOR)
+ .cogroup(grouped2, STRING_AGGREGATOR)
+ .cogroup(grouped3, STRING_AGGREGATOR)
+ .aggregate(STRING_INITIALIZER);
+
+ customers.toStream().to(OUTPUT);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> testInputTopic =
+ driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> testInputTopic2 =
+ driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> testInputTopic3 =
+ driver.createInputTopic("three", new StringSerializer(), new StringSerializer());
+
+ final TestOutputTopic<String, String> testOutputTopic =
+ driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+ testInputTopic.pipeInput("k1", "A", 0L);
+ testInputTopic.pipeInput("k2", "A", 1L);
+ testInputTopic.pipeInput("k1", "A", 10L);
+ testInputTopic.pipeInput("k2", "A", 100L);
+ testInputTopic2.pipeInput("k2", "B", 100L);
+ testInputTopic2.pipeInput("k2", "B", 200L);
+ testInputTopic2.pipeInput("k1", "B", 1L);
+ testInputTopic2.pipeInput("k2", "B", 500L);
+ testInputTopic3.pipeInput("k1", "B", 500L);
+ testInputTopic3.pipeInput("k2", "B", 500L);
+ testInputTopic3.pipeInput("k3", "B", 500L);
+ testInputTopic3.pipeInput("k2", "B", 100L);
+
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "A", 1);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AA", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AAB", 100);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABB", 200);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAB", 10);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AABB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBBB", 500);
+ assertOutputKeyValueTimestamp(testOutputTopic, "k3", "B", 500);
+ }
+ }
+
+ private void assertOutputKeyValueTimestamp(final TestOutputTopic<String, String> outputTopic,
+ final String expectedKey,
+ final String expectedValue,
+ final long expectedTimestamp) {
+ assertThat(
+ outputTopic.readRecord(),
+ equalTo(new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp)));
+ }
+
+ private void assertOutputKeyValueTimestamp(final TestOutputTopic<String, Integer> outputTopic,
+ final String expectedKey,
+ final Integer expectedValue,
+ final long expectedTimestamp) {
+ assertThat(
+ outputTopic.readRecord(),
+ equalTo(new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp)));
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 7cfc8c2..f2e6e1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -82,6 +82,11 @@ public class KGroupedStreamImplTest {
}
@Test(expected = NullPointerException.class)
+ public void shouldNotHaveNullAggregatorOnCogroup() {
+ groupedStream.cogroup(null);
+ }
+
+ @Test(expected = NullPointerException.class)
public void shouldNotHaveNullReducerOnReduce() {
groupedStream.reduce(null);
}
@@ -471,8 +476,6 @@ public class KGroupedStreamImplTest {
}
}
-
- @SuppressWarnings("unchecked")
@Test
public void shouldReduceAndMaterializeResults() {
groupedStream.reduce(
@@ -544,8 +547,6 @@ public class KGroupedStreamImplTest {
}
}
-
- @SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
groupedStream.aggregate(
@@ -575,7 +576,6 @@ public class KGroupedStreamImplTest {
}
}
- @SuppressWarnings("unchecked")
@Test
public void shouldAggregateWithDefaultSerdes() {
final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();