You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/12/01 03:37:32 UTC

[kafka] branch trunk updated: KAFKA-6049: Add non-windowed Cogroup operator (KIP-150) (#7538)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b8ea7e  KAFKA-6049: Add non-windowed Cogroup operator (KIP-150) (#7538)
0b8ea7e is described below

commit 0b8ea7e162e68662fbda6893d188862999d9b427
Author: wcarlson5 <18...@users.noreply.github.com>
AuthorDate: Sat Nov 30 19:37:04 2019 -0800

    KAFKA-6049: Add non-windowed Cogroup operator (KIP-150) (#7538)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/kstream/CogroupedKStream.java    | 268 ++++++++++
 .../kafka/streams/kstream/KGroupedStream.java      |  18 +
 .../org/apache/kafka/streams/kstream/KStream.java  |   6 +
 .../streams/kstream/internals/AbstractStream.java  |  10 +-
 .../kstream/internals/CogroupedKStreamImpl.java    | 107 ++++
 .../internals/CogroupedStreamAggregateBuilder.java | 146 ++++++
 .../kstream/internals/KGroupedStreamImpl.java      |   8 +
 .../streams/kstream/internals/KStreamImpl.java     |   8 +-
 .../streams/kstream/internals/KStreamImplJoin.java |   2 +-
 .../streams/kstream/internals/KTableImpl.java      |   2 +-
 .../{KStreamPassThrough.java => PassThrough.java}  |   6 +-
 .../internals/CogroupedKStreamImplTest.java        | 551 +++++++++++++++++++++
 .../kstream/internals/KGroupedStreamImplTest.java  |  10 +-
 13 files changed, 1124 insertions(+), 18 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
new file mode 100644
index 0000000..a9b66d8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+/**
+ * {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
+ * <p>
+ * It is an intermediate representation after a grouping of {@link KStream}s, before the
+ * aggregations are applied to the new partitions resulting in a {@link KTable}.
+ * <p>
+ * A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via
+ * {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
+ *
+ * @param <K> Type of keys
+ * @param <VOut> Type of values after agg
+ */
+public interface CogroupedKStream<K, VOut> {
+
+    /**
+     * Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}.
+     * <p>
+     * The added {@link KGroupedStream grouped KStream} must have the same number of partitions as all existing
+     * streams of this {@code CogroupedKStream}.
+     * If this is not the case, you would need to call {@link KStream#through(String)} before
+     * {@link KStream#groupByKey() grouping} the {@link KStream}, using a pre-created topic with the "correct" number of
+     * partitions.
+     * <p>
+     * The specified {@link Aggregator} is applied in the actual {@link #aggregate(Initializer) aggregation} step for
+     * each input record and computes a new aggregate using the current aggregate (or for the very first record per key
+     * using the initial intermediate aggregation result provided via the {@link Initializer} that is passed into
+     * {@link #aggregate(Initializer)}) and the record's value.
+     *
+     * @param groupedStream a group stream
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param <VIn> Type of input values
+     * @return a {@code CogroupedKStream}
+     */
+    <VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn> groupedStream,
+                                            final Aggregator<? super K, ? super VIn, VOut> aggregator);
+
+    /**
+     * Aggregate the values of records in these streams by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried by the given store name in {@code materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * To compute the aggregation the corresponding {@link Aggregator} as specified in
+     * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+     * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+     * same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+     * the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+     * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is a generated value, and
+     * "-changelog" is a fixed suffix.
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * @param initializer  an {@link Initializer} that computes an initial intermediate aggregation
+     *                     result. Cannot be {@code null}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+     * represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, VOut> aggregate(final Initializer<VOut> initializer);
+
+    /**
+     * Aggregate the values of records in these streams by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried by the given store name in {@code materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * To compute the aggregation the corresponding {@link Aggregator} as specified in
+     * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+     * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Named} is applied once to the processor combining the grouped streams.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+     * same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+     * the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+     * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the provide store name defined
+     * in {@code Materialized}, and "-changelog" is a fixed suffix.
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * @param initializer  an {@link Initializer} that computes an initial intermediate aggregation
+     *                     result. Cannot be {@code null}.
+     * @param named        name the processor. Cannot be {@code null}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+     * represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+                              final Named named);
+
+    /**
+     * Aggregate the values of records in these streams by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried by the given store name in {@code materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * To compute the aggregation the corresponding {@link Aggregator} as specified in
+     * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+     * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+     * same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+     * the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+     * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the provide store name defined
+     * in {@code Materialized}, and "-changelog" is a fixed suffix.
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * @param initializer  an {@link Initializer} that computes an initial intermediate aggregation
+     *                     result. Cannot be {@code null}.
+     * @param materialized an instance of {@link Materialized} used to materialize a state store.
+     *                     Cannot be {@code null}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+     * represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+                              final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Aggregate the values of records in these streams by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried by the given store name in {@code materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * To compute the aggregation the corresponding {@link Aggregator} as specified in
+     * {@link #cogroup(KGroupedStream, Aggregator) cogroup(...)} is used per input stream.
+     * The specified {@link Initializer} is applied once per key, directly before the first input record per key is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Named} is used to name the processor combining the grouped streams.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the
+     * same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * String queryableStoreName = "storeName" // the store name should be the name of the store as defined by the Materialized instance
+     * KeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to query
+     * the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot
+     * contain characters other than ASCII alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the provide store name defined
+     * in {@code Materialized}, and "-changelog" is a fixed suffix.
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * @param initializer  an {@link Initializer} that computes an initial intermediate aggregation
+     *                     result. Cannot be {@code null}.
+     * @param materialized an instance of {@link Materialized} used to materialize a state store.
+     *                     Cannot be {@code null}.
+     * @param named        name the processors. Cannot be {@code null}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that
+     * represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+                              final Named named,
+                              final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 6b3c423..3f4e845 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -535,4 +535,22 @@ public interface KGroupedStream<K, V> {
      */
     SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows);
 
+    /**
+     * Create a new {@link CogroupedKStream} from the this grouped KStream to allow cogrouping other
+     * {@code KGroupedStream} to it.
+     * {@link CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
+     * It is an intermediate representation after a grouping of {@link KStream}s, before the
+     * aggregations are applied to the new partitions resulting in a {@link KTable}.
+     * <p>
+     * The specified {@link Aggregator} is applied in the actual {@link CogroupedKStream#aggregate(Initializer)
+     * aggregation} step for each input record and computes a new aggregate using the current aggregate (or for the very
+     * first record per key using the initial intermediate aggregation result provided via the {@link Initializer} that
+     * is passed into {@link CogroupedKStream#aggregate(Initializer)}) and the record's value.
+     *
+     * @param aggregator an {@link Aggregator} that computes a new aggregate result
+     * @param <Vout> the type of the output values
+     * @return a {@link CogroupedKStream}
+     */
+    <Vout> CogroupedKStream<K, Vout> cogroup(final Aggregator<? super K, ? super V, Vout> aggregator);
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 255ccb7..8a602c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -2065,6 +2065,7 @@ public interface KStream<K, V> {
     /**
      * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
      * and default serializers and deserializers.
+     * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
      * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
@@ -2094,6 +2095,7 @@ public interface KStream<K, V> {
     /**
      * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
      * and using the serializers as defined by {@link Serialized}.
+     * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
      * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
@@ -2124,6 +2126,7 @@ public interface KStream<K, V> {
     /**
      * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
      * and using the serializers as defined by {@link Grouped}.
+     * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
      * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
@@ -2154,6 +2157,7 @@ public interface KStream<K, V> {
     /**
      * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
      * and default serializers and deserializers.
+     * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
      * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
@@ -2183,6 +2187,7 @@ public interface KStream<K, V> {
     /**
      * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
      * and {@link Serde}s as specified by {@link Serialized}.
+     * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
      * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
@@ -2215,6 +2220,7 @@ public interface KStream<K, V> {
     /**
      * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
      * and {@link Serde}s as specified by {@link Grouped}.
+     * {@link KGroupedStream} can be further grouped with other streams to form a {@link CogroupedKStream}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
      * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index f087b79..736d68f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Collection;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -84,11 +85,12 @@ public abstract class AbstractStream<K, V> {
         return builder.internalTopologyBuilder;
     }
 
-    Set<String> ensureJoinableWith(final AbstractStream<K, ?> other) {
-        final Set<String> allSourceNodes = new HashSet<>();
+    Set<String> ensureCopartitionWith(final Collection<? extends AbstractStream<K, ?>> otherStreams) {
+        final Set<String> allSourceNodes = new HashSet<>(sourceNodes);
         allSourceNodes.addAll(sourceNodes);
-        allSourceNodes.addAll(other.sourceNodes);
-
+        for (final AbstractStream<K, ?> other: otherStreams) {
+            allSourceNodes.addAll(other.sourceNodes);
+        }
         builder.internalTopologyBuilder.copartitionSources(allSourceNodes);
 
         return allSourceNodes;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
new file mode 100644
index 0000000..959d26e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> implements CogroupedKStream<K, VOut> {
+
+    static final String AGGREGATE_NAME = "COGROUPKSTREAM-AGGREGATE-";
+    static final String MERGE_NAME = "COGROUPKSTREAM-MERGE-";
+
+    final private Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns;
+    final private CogroupedStreamAggregateBuilder<K, VOut> aggregateBuilder;
+
+    CogroupedKStreamImpl(final String name,
+                         final Set<String> sourceNodes,
+                         final StreamsGraphNode streamsGraphNode,
+                         final InternalStreamsBuilder builder) {
+        super(name, null, null, sourceNodes, streamsGraphNode, builder);
+        groupPatterns = new LinkedHashMap<>();
+        aggregateBuilder = new CogroupedStreamAggregateBuilder<>(builder);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn> groupedStream,
+                                                   final Aggregator<? super K, ? super VIn, VOut> aggregator) {
+        Objects.requireNonNull(groupedStream, "groupedStream can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        groupPatterns.put((KGroupedStreamImpl<K, ?>) groupedStream,
+                          (Aggregator<? super K, ? super Object, VOut>) aggregator);
+        return this;
+    }
+
+    @Override
+    public KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+                                     final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<K, VOut> aggregate(final Initializer<VOut> initializer, final Named named) {
+        return aggregate(initializer, named, Materialized.with(keySerde, null));
+    }
+
+    @Override
+    public KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
+                                     final Named named,
+                                     final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return doAggregate(
+            initializer,
+            new NamedInternal(named),
+            new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME));
+    }
+
+    @Override
+    public KTable<K, VOut> aggregate(final Initializer<VOut> initializer) {
+        return aggregate(initializer, Materialized.with(keySerde, null));
+    }
+
+    private KTable<K, VOut> doAggregate(final Initializer<VOut> initializer,
+                                        final NamedInternal named,
+                                        final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+        return aggregateBuilder.build(
+            groupPatterns,
+            initializer,
+            named,
+            new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
+            materializedInternal.keySerde(),
+            materializedInternal.valueSerde(),
+            materializedInternal.queryableStoreName(),
+            null,
+            null,
+            null);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
new file mode 100644
index 0000000..a3f2f1d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class CogroupedStreamAggregateBuilder<K, VOut> {
+    private final InternalStreamsBuilder builder;
+
+    CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
+        this.builder = builder;
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                       final Initializer<VOut> initializer,
+                                                       final NamedInternal named,
+                                                       final StoreBuilder<? extends StateStore> storeBuilder,
+                                                       final Serde<KR> keySerde,
+                                                       final Serde<VOut> valSerde,
+                                                       final String queryableName,
+                                                       final Windows<W> windows,
+                                                       final SessionWindows sessionWindows,
+                                                       final Merger<? super K, VOut> sessionMerger) {
+
+        final Collection<? extends AbstractStream<K, ?>> groupedStreams = new ArrayList<>(groupPatterns.keySet());
+        final AbstractStream<K, ?> kGrouped = groupedStreams.iterator().next();
+        groupedStreams.remove(kGrouped);
+        kGrouped.ensureCopartitionWith(groupedStreams);
+
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(
+                kGroupedStream.getValue(),
+                initializer,
+                named.suffixWithOrElseGet(
+                    "-cogroup-agg-" + counter++,
+                    builder,
+                    CogroupedKStreamImpl.AGGREGATE_NAME),
+                stateCreated,
+                storeBuilder,
+                windows,
+                sessionWindows,
+                sessionMerger);
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(kGroupedStream.getKey().streamsGraphNode, statefulProcessorNode);
+        }
+        final String mergeProcessorName = named.suffixWithOrElseGet(
+            "-cogroup-merge",
+            builder,
+            CogroupedKStreamImpl.MERGE_NAME);
+        final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
+        final ProcessorGraphNode<K, VOut> mergeNode =
+            new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
+
+        builder.addGraphNode(processors, mergeNode);
+
+        return new KTableImpl<KR, VIn, VOut>(
+            mergeProcessorName,
+            keySerde,
+            valSerde,
+            Collections.singleton(mergeNode.nodeName()),
+            queryableName,
+            passThrough,
+            mergeNode,
+            builder);
+    }
+
+    private <W extends Window> StatefulProcessorNode getStatefulProcessorNode(final Aggregator<? super K, ? super Object, VOut> aggregator,
+                                                                              final Initializer<VOut> initializer,
+                                                                              final String processorName,
+                                                                              final boolean stateCreated,
+                                                                              final StoreBuilder<? extends StateStore> storeBuilder,
+                                                                              final Windows<W> windows,
+                                                                              final SessionWindows sessionWindows,
+                                                                              final Merger<? super K, VOut> sessionMerger) {
+
+        final ProcessorSupplier<K, ?> kStreamAggregate;
+
+        if (windows == null && sessionWindows == null) {
+            kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator);
+        } else if (windows != null && sessionWindows == null) {
+            kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator);
+        } else if (windows == null && sessionMerger != null) {
+            kStreamAggregate = new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, aggregator, sessionMerger);
+        } else {
+            throw new IllegalArgumentException("must include windows OR sessionWindows + sessionMerger OR all must be null");
+        }
+
+        final StatefulProcessorNode<K, ?> statefulProcessorNode;
+        if (!stateCreated) {
+            statefulProcessorNode =
+                    new StatefulProcessorNode<>(
+                            processorName,
+                            new ProcessorParameters<>(kStreamAggregate, processorName),
+                            storeBuilder
+                    );
+        } else {
+            statefulProcessorNode =
+                    new StatefulProcessorNode<>(
+                            processorName,
+                            new ProcessorParameters<>(kStreamAggregate, processorName),
+                            new String[]{storeBuilder.name()}
+                    );
+        }
+        return statefulProcessorNode;
+    }
+
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 5751105..7884f30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -223,4 +224,11 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
             materializedInternal.keySerde(),
             materializedInternal.valueSerde());
     }
+
+    @Override
+    public <Vout> CogroupedKStream<K, Vout> cogroup(final Aggregator<? super K, ? super V, Vout> aggregator) {
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        return new CogroupedKStreamImpl<K, Vout>(name, sourceNodes, streamsGraphNode, builder)
+            .cogroup(this, aggregator);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bca4942..870cf53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -368,7 +368,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
 
         for (int i = 0; i < predicates.length; i++) {
-            final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
+            final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new PassThrough<K, V>(), childNames[i]);
             final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters);
 
             builder.addGraphNode(branchNode, branchChildNode);
@@ -401,7 +401,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(streamImpl.sourceNodes);
 
-        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new PassThrough<>(), name);
 
         final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name, processorParameters);
         mergeNode.setMergeNode(true);
@@ -796,7 +796,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
             joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
         }
 
-        joinThis.ensureJoinableWith(joinOther);
+        joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
 
         return join.join(
             joinThis,
@@ -1037,7 +1037,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(other, "other KTable can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
+        final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) other));
 
         final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index b77e064..9affbe9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -133,7 +133,7 @@ class KStreamImplJoin {
             rightOuter
         );
 
-        final KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
+        final PassThrough<K1, R> joinMerge = new PassThrough<>();
 
         final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K1, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f6f2ada..052a5f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -677,7 +677,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
         final NamedInternal renamed = new NamedInternal(joinName);
         final String joinMergeName = renamed.orElseGenerateWithPrefix(builder, MERGE_NAME);
-        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
+        final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) other));
 
         if (leftOuter) {
             enableSendingOldValues();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
similarity index 84%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
index 2afe507..b83b3a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PassThrough.java
@@ -20,14 +20,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamPassThrough<K, V> implements ProcessorSupplier<K, V> {
+class PassThrough<K, V> implements ProcessorSupplier<K, V> {
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamPassThroughProcessor<>();
+        return new PassThroughProcessor<>();
     }
 
-    private static final class KStreamPassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
+    private static final class PassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
         @Override
         public void process(final K key, final V value) {
             context().forward(key, value);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
new file mode 100644
index 0000000..ad16879
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class CogroupedKStreamImplTest {
+    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
+    private static final String TOPIC = "topic";
+    private static final String OUTPUT = "output";
+    private final StreamsBuilder builder = new StreamsBuilder();
+    private KGroupedStream<String, String> groupedStream;
+    private CogroupedKStream<String, String> cogroupedStream;
+
+    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+
+    private static final Aggregator<String, String, String> STRING_AGGREGATOR =
+        (key, value, aggregate) -> aggregate + value;
+
+    private static final Initializer<String> STRING_INITIALIZER = () -> "";
+
+    private static final Aggregator<String, String, Integer> STRING_SUM_AGGREGATOR =
+        (key, value, aggregate) -> aggregate + Integer.parseInt(value);
+
+    private static final Aggregator<? super String, ? super Integer, Integer> SUM_AGGREGATOR =
+        (key, value, aggregate) -> aggregate + value;
+
+    private static final Initializer<Integer> SUM_INITIALIZER = () -> 0;
+
+
+    @Before
+    public void setup() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
+        cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEInCogroupIfKGroupedStreamIsNull() {
+        cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullAggregatorOnCogroup() {
+        cogroupedStream.cogroup(groupedStream, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnAggregate() {
+        cogroupedStream.aggregate(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnAggregateWitNamed() {
+        cogroupedStream.aggregate(null, Named.as("name"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnAggregateWitMaterialized() {
+        cogroupedStream.aggregate(null, Materialized.as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnAggregateWitNamedAndMaterialized() {
+        cogroupedStream.aggregate(null, Named.as("name"), Materialized.as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullNamedOnAggregate() {
+        cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullMaterializedOnAggregate() {
+        cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullNamedOnAggregateWithMateriazlied() {
+        cogroupedStream.aggregate(STRING_INITIALIZER,  null,  Materialized.as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullMaterializedOnAggregateWithNames() {
+        cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), null);
+    }
+
+    @Test
+    public void shouldNameProcessorsAndStoreBasedOnNamedParameter() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, String> test2 = builder.stream("two", stringConsumed);
+
+        final KGroupedStream<String, String> groupedOne = stream1.groupByKey();
+        final KGroupedStream<String, String> groupedTwo = test2.groupByKey();
+
+        final KTable<String, String> customers = groupedOne
+            .cogroup(STRING_AGGREGATOR)
+            .cogroup(groupedTwo, STRING_AGGREGATOR)
+            .aggregate(STRING_INITIALIZER, Named.as("test"), Materialized.as("store"));
+
+        customers.toStream().to(OUTPUT);
+
+        final String topologyDescription = builder.build().describe().toString();
+
+        assertThat(
+            topologyDescription,
+            equalTo("Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n" +
+                "      --> test-cogroup-agg-0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n" +
+                "      --> test-cogroup-agg-1\n" +
+                "    Processor: test-cogroup-agg-0 (stores: [store])\n" +
+                "      --> test-cogroup-merge\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: test-cogroup-agg-1 (stores: [store])\n" +
+                "      --> test-cogroup-merge\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: test-cogroup-merge (stores: [])\n" +
+                "      --> KTABLE-TOSTREAM-0000000005\n" +
+                "      <-- test-cogroup-agg-0, test-cogroup-agg-1\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000006\n" +
+                "      <-- test-cogroup-merge\n" +
+                "    Sink: KSTREAM-SINK-0000000006 (topic: output)\n" +
+                "      <-- KTABLE-TOSTREAM-0000000005\n\n"));
+    }
+
+    @Test
+    public void shouldCogroupAndAggregateSingleKStreams() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+
+        final KTable<String, String> customers = grouped1
+            .cogroup(STRING_AGGREGATOR)
+            .aggregate(STRING_INITIALIZER);
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic =
+                driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> testOutputTopic =
+                driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            testInputTopic.pipeInput("k1", "A", 0);
+            testInputTopic.pipeInput("k2", "B", 0);
+            testInputTopic.pipeInput("k2", "B", 0);
+            testInputTopic.pipeInput("k1", "A", 0);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "B", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BB", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 0);
+        }
+    }
+
+    @Test
+    public void testCogroupHandleNullValues() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+
+        final KTable<String, String> customers = grouped1
+            .cogroup(STRING_AGGREGATOR)
+            .aggregate(STRING_INITIALIZER);
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic = driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> testOutputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+            testInputTopic.pipeInput("k1", "A", 0);
+            testInputTopic.pipeInput("k2", "B", 0);
+            testInputTopic.pipeInput("k2", null, 0);
+            testInputTopic.pipeInput("k2", "B", 0);
+            testInputTopic.pipeInput("k1", "A", 0);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "B", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BB", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 0);
+        }
+    }
+
+    @Test
+    public void shouldCogroupAndAggregateTwoKStreamsWithDistinctKeys() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+        final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+        final KTable<String, String> customers = grouped1
+            .cogroup(STRING_AGGREGATOR)
+            .cogroup(grouped2, STRING_AGGREGATOR)
+            .aggregate(STRING_INITIALIZER);
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic =
+                driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> testInputTopic2 =
+                driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> testOutputTopic =
+                driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+            testInputTopic.pipeInput("k1", "A", 0);
+            testInputTopic.pipeInput("k1", "A", 1);
+            testInputTopic.pipeInput("k1", "A", 10);
+            testInputTopic.pipeInput("k1", "A", 100);
+            testInputTopic2.pipeInput("k2", "B", 100L);
+            testInputTopic2.pipeInput("k2", "B", 200L);
+            testInputTopic2.pipeInput("k2", "B", 1L);
+            testInputTopic2.pipeInput("k2", "B", 500L);
+            testInputTopic2.pipeInput("k2", "B", 500L);
+            testInputTopic2.pipeInput("k2", "B", 100L);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 1);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAA", 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAAA", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "B", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BB", 200);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBB", 200);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBBB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBBBB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "BBBBBB", 500);
+        }
+    }
+
+    @Test
+    public void shouldCogroupAndAggregateTwoKStreamsWithSharedKeys() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+        final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+        final KTable<String, String> customers = grouped1
+            .cogroup(STRING_AGGREGATOR)
+            .cogroup(grouped2, STRING_AGGREGATOR)
+            .aggregate(STRING_INITIALIZER);
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic =
+                driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> testInputTopic2 =
+                driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> testOutputTopic =
+                driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+            testInputTopic.pipeInput("k1", "A", 0L);
+            testInputTopic.pipeInput("k2", "A", 1L);
+            testInputTopic.pipeInput("k1", "A", 10L);
+            testInputTopic.pipeInput("k2", "A", 100L);
+            testInputTopic2.pipeInput("k2", "B", 100L);
+            testInputTopic2.pipeInput("k2", "B", 200L);
+            testInputTopic2.pipeInput("k1", "B", 1L);
+            testInputTopic2.pipeInput("k2", "B", 500L);
+            testInputTopic2.pipeInput("k1", "B", 500L);
+            testInputTopic2.pipeInput("k2", "B", 500L);
+            testInputTopic2.pipeInput("k3", "B", 500L);
+            testInputTopic2.pipeInput("k2", "B", 100L);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "A", 1);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AA", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AAB", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABB", 200);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAB", 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AABB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBBB", 500);
+        }
+    }
+
+    @Test
+    public void shouldAllowDifferentOutputTypeInCoGroup() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+        final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+        final KTable<String, Integer> customers = grouped1
+            .cogroup(STRING_SUM_AGGREGATOR)
+            .cogroup(grouped2, STRING_SUM_AGGREGATOR)
+            .aggregate(
+                SUM_INITIALIZER,
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("store1")
+                    .withValueSerde(Serdes.Integer()));
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic =
+                driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> testInputTopic2 =
+                driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, Integer> testOutputTopic =
+                driver.createOutputTopic(OUTPUT, new StringDeserializer(), new IntegerDeserializer());
+
+            testInputTopic.pipeInput("k1", "1", 0L);
+            testInputTopic.pipeInput("k2", "1", 1L);
+            testInputTopic.pipeInput("k1", "1", 10L);
+            testInputTopic.pipeInput("k2", "1", 100L);
+            testInputTopic2.pipeInput("k2", "2", 100L);
+            testInputTopic2.pipeInput("k2", "2", 200L);
+            testInputTopic2.pipeInput("k1", "2", 1L);
+            testInputTopic2.pipeInput("k2", "2", 500L);
+            testInputTopic2.pipeInput("k1", "2", 500L);
+            testInputTopic2.pipeInput("k2", "3", 500L);
+            testInputTopic2.pipeInput("k3", "2", 500L);
+            testInputTopic2.pipeInput("k2", "2", 100L);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 1, 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 1, 1);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 2, 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 2, 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 4, 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 6, 200);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 4, 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 8, 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 6, 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 11, 500);
+        }
+    }
+
+    @Test
+    public void shouldCoGroupStreamsWithDifferentInputTypes() {
+        final Consumed<String, Integer> integerConsumed = Consumed.with(Serdes.String(), Serdes.Integer());
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, Integer> stream2 = builder.stream("two", integerConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+        final KGroupedStream<String, Integer> grouped2 = stream2.groupByKey();
+
+        final KTable<String, Integer> customers = grouped1
+            .cogroup(STRING_SUM_AGGREGATOR)
+            .cogroup(grouped2, SUM_AGGREGATOR)
+            .aggregate(
+                SUM_INITIALIZER,
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("store1")
+                    .withValueSerde(Serdes.Integer()));
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic = driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, Integer> testInputTopic2 = driver.createInputTopic("two", new StringSerializer(), new IntegerSerializer());
+            final TestOutputTopic<String, Integer> testOutputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new IntegerDeserializer());
+            testInputTopic.pipeInput("k1", "1", 0L);
+            testInputTopic.pipeInput("k2", "1", 1L);
+            testInputTopic.pipeInput("k1", "1", 10L);
+            testInputTopic.pipeInput("k2", "1", 100L);
+
+            testInputTopic2.pipeInput("k2", 2, 100L);
+            testInputTopic2.pipeInput("k2", 2, 200L);
+            testInputTopic2.pipeInput("k1", 2, 1L);
+            testInputTopic2.pipeInput("k2", 2, 500L);
+            testInputTopic2.pipeInput("k1", 2, 500L);
+            testInputTopic2.pipeInput("k2", 3, 500L);
+            testInputTopic2.pipeInput("k3", 2, 500L);
+            testInputTopic2.pipeInput("k2", 2, 100L);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 1, 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 1, 1);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 2, 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 2, 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 4, 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 6, 200);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 4, 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 8, 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", 6, 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", 11, 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k3", 2, 500);
+        }
+    }
+
+    @Test
+    public void testCogroupKeyMixedAggregators() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+        final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+
+        final KTable<String, String> customers = grouped1
+            .cogroup(MockAggregator.TOSTRING_REMOVER)
+            .cogroup(grouped2, MockAggregator.TOSTRING_ADDER)
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store1")
+                    .withValueSerde(Serdes.String()));
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic =
+                driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> testInputTopic2 =
+                driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+            final TestOutputTopic<String, String> testOutputTopic =
+                driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+            testInputTopic.pipeInput("k1", "1", 0L);
+            testInputTopic.pipeInput("k2", "1", 1L);
+            testInputTopic.pipeInput("k1", "1", 10L);
+            testInputTopic.pipeInput("k2", "1", 100L);
+            testInputTopic2.pipeInput("k1", "2", 500L);
+            testInputTopic2.pipeInput("k2", "2", 500L);
+            testInputTopic2.pipeInput("k1", "2", 500L);
+            testInputTopic2.pipeInput("k2", "2", 100L);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1", 1);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1-1", 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1-1", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1-1+2", 500L);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1-1+2", 500L);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0-1-1+2+2", 500L);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0-1-1+2+2", 500L);
+        }
+    }
+
+    @Test
+    public void testCogroupWithThreeGroupedStreams() {
+        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
+        final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
+        final KStream<String, String> stream3 = builder.stream("three", stringConsumed);
+
+        final KGroupedStream<String, String> grouped1 = stream1.groupByKey();
+        final KGroupedStream<String, String> grouped2 = stream2.groupByKey();
+        final KGroupedStream<String, String> grouped3 = stream3.groupByKey();
+
+        final KTable<String, String> customers = grouped1
+            .cogroup(STRING_AGGREGATOR)
+            .cogroup(grouped2, STRING_AGGREGATOR)
+            .cogroup(grouped3, STRING_AGGREGATOR)
+            .aggregate(STRING_INITIALIZER);
+
+        customers.toStream().to(OUTPUT);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> testInputTopic =
+                driver.createInputTopic("one", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> testInputTopic2 =
+                driver.createInputTopic("two", new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> testInputTopic3 =
+                driver.createInputTopic("three", new StringSerializer(), new StringSerializer());
+
+            final TestOutputTopic<String, String> testOutputTopic =
+                driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
+
+            testInputTopic.pipeInput("k1", "A", 0L);
+            testInputTopic.pipeInput("k2", "A", 1L);
+            testInputTopic.pipeInput("k1", "A", 10L);
+            testInputTopic.pipeInput("k2", "A", 100L);
+            testInputTopic2.pipeInput("k2", "B", 100L);
+            testInputTopic2.pipeInput("k2", "B", 200L);
+            testInputTopic2.pipeInput("k1", "B", 1L);
+            testInputTopic2.pipeInput("k2", "B", 500L);
+            testInputTopic3.pipeInput("k1", "B", 500L);
+            testInputTopic3.pipeInput("k2", "B", 500L);
+            testInputTopic3.pipeInput("k3", "B", 500L);
+            testInputTopic3.pipeInput("k2", "B", 100L);
+
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "A", 0);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "A", 1);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AA", 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AA", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AAB", 100);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABB", 200);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AAB", 10);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "AABB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "AABBBB", 500);
+            assertOutputKeyValueTimestamp(testOutputTopic, "k3", "B", 500);
+        }
+    }
+
+    private void assertOutputKeyValueTimestamp(final TestOutputTopic<String, String> outputTopic,
+                                               final String expectedKey,
+                                               final String expectedValue,
+                                               final long expectedTimestamp) {
+        assertThat(
+            outputTopic.readRecord(),
+            equalTo(new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp)));
+    }
+
+    private void assertOutputKeyValueTimestamp(final TestOutputTopic<String, Integer> outputTopic,
+                                               final String expectedKey,
+                                               final Integer expectedValue,
+                                               final long expectedTimestamp) {
+        assertThat(
+            outputTopic.readRecord(),
+            equalTo(new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp)));
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 7cfc8c2..f2e6e1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -82,6 +82,11 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullAggregatorOnCogroup() {
+        groupedStream.cogroup(null);
+    }
+
+    @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullReducerOnReduce() {
         groupedStream.reduce(null);
     }
@@ -471,8 +476,6 @@ public class KGroupedStreamImplTest {
         }
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldReduceAndMaterializeResults() {
         groupedStream.reduce(
@@ -544,8 +547,6 @@ public class KGroupedStreamImplTest {
         }
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateAndMaterializeResults() {
         groupedStream.aggregate(
@@ -575,7 +576,6 @@ public class KGroupedStreamImplTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateWithDefaultSerdes() {
         final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();