You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/12 18:24:00 UTC

[jira] [Commented] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

    [ https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473237#comment-16473237 ] 

ASF GitHub Bot commented on KAFKA-6049:
---------------------------------------

ConcurrencyPractitioner closed pull request #4646: [KAFKA-6049] Kafka Streams: Add Cogroup in the DSL
URL: https://github.com/apache/kafka/pull/4646
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
new file mode 100644
index 00000000000..a1aaa1194f5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
+ * It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
+ * operations on the original {@link KStream} records.
+ * <p>
+ * It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
+ * the new partitions resulting in a {@link KTable}.
+ * <p>
+ * A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
+ * {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
+ *
+ * @param <K> Type of keys
+ * @param <RK> Type of key in table, either K or Windowed&ltK&gt
+ * @param <V> Type of aggregate values
+ * @see KGroupedStream
+ */
+@InterfaceStability.Unstable
+public interface CogroupedKStream<K, V> {
+
+    <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> groupedStream,
+                                       final Aggregator<? super K, ? super T, V> aggregator);
+
+    KTable<K, V> aggregate(final Initializer<V> initializer,
+                           final Serde<V> valueSerde,
+                           final String storeName);
+    
+    KTable<K, V> aggregate(final Initializer<V> initializer,
+                           final StateStoreSupplier<KeyValueStore> storeSupplier);
+
+    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                     final Merger<? super K, V> sessionMerger,
+                                     final SessionWindows sessionWindows,
+                                     final Serde<V> valueSerde,
+                                     final String storeName);
+
+    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                     final Merger<? super K, V> sessionMerger,
+                                     final SessionWindows sessionWindows,
+                                     final StateStoreSupplier<SessionStore> storeSupplier);
+
+    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                                        final Windows<W> windows,
+                                                        final Serde<V> valueSerde,
+                                                        final String storeName);
+
+    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                                        final Windows<W> windows,
+                                                        final StateStoreSupplier<WindowStore> storeSupplier);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 29de64c1e67..96154e6b322 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -1594,4 +1594,5 @@
      */
     SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows);
 
+    <T> CogroupedKStream<K, T> cogroup(final Aggregator<? super K, ? super V, T> aggregator);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d747ce8d049..ffc307326f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -50,7 +50,15 @@
 @Deprecated
 public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
 
-    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
+    private final InternalStreamsBuilder internalStreamsBuilder;
+    
+    public KStreamBuilder() {
+        internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
+    }
+
+    public KStreamBuilder(InternalStreamsBuilder internalStreamsBuilder) {
+        this.internalStreamsBuilder = internalStreamsBuilder;
+    }
 
     private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset resetPolicy) {
         if (resetPolicy == null) {
@@ -1266,4 +1274,7 @@ public String newStoreName(final String prefix) {
         return internalStreamsBuilder.newStoreName(prefix);
     }
 
+    public InternalStreamsBuilder internalStreamsBuilder() {
+        return internalStreamsBuilder;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
new file mode 100644
index 00000000000..7857a43d043
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+
+class CogroupedKStreamImpl<K, V> implements CogroupedKStream<K, V> {
+
+    private final AtomicInteger index = new AtomicInteger(0);
+    private static final String COGROUP_AGGREGATE_NAME = "KSTREAM-COGROUP-AGGREGATE-";
+    private static final String COGROUP_NAME = "KSTREAM-COGROUP-";
+    private static enum AggregateType {
+        AGGREGATE,
+        SESSION_WINDOW_AGGREGATE,
+        WINDOW_AGGREGATE
+    }
+
+    private final InternalStreamsBuilder builder;
+    private final Serde<K> keySerde;
+    private final Map<KGroupedStream, Aggregator> pairs = new HashMap<>();
+    private final Map<KGroupedStreamImpl, String> repartitionNames = new HashMap<>();
+
+    <T> CogroupedKStreamImpl(final InternalStreamsBuilder builder,
+                         final KGroupedStream<K, T> groupedStream,
+                         final Serde<K> keySerde,
+                         final Aggregator<? super K, ? super T, V> aggregator) {
+        this.builder = builder;
+        this.keySerde = keySerde;
+        cogroup(groupedStream, aggregator);
+    }
+
+    @Override
+    public <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> groupedStream,
+                                              final Aggregator<? super K, ? super T, V> aggregator) {
+        Objects.requireNonNull(groupedStream, "groupedStream can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        pairs.put(groupedStream, aggregator);
+        return this;
+    }
+
+    @Override
+    public KTable<K, V> aggregate(final Initializer<V> initializer,
+                                  final Serde<V> valueSerde,
+                                  final String storeName) {
+        return aggregate(initializer, AbstractStream.keyValueStore(keySerde, valueSerde, storeName));
+    }
+
+    @Override
+    public KTable<K, V> aggregate(final Initializer<V> initializer,
+                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doAggregate(AggregateType.AGGREGATE, initializer, storeSupplier, null, null, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Merger<? super K, V> sessionMerger,
+                                            final SessionWindows sessionWindows,
+                                            final Serde<V> valueSerde,
+                                            final String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
+        return aggregate(initializer, sessionMerger, sessionWindows, AbstractStream.storeFactory(keySerde, valueSerde, storeName).sessionWindowed(sessionWindows.maintainMs()).build());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Merger<? super K, V> sessionMerger,
+                                            final SessionWindows sessionWindows,
+                                            final StateStoreSupplier<SessionStore> storeSupplier) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return (KTable<Windowed<K>, V>) doAggregate(AggregateType.SESSION_WINDOW_AGGREGATE, initializer, storeSupplier, sessionMerger, sessionWindows, null);
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                                               final Windows<W> windows,
+                                                               final Serde<V> valueSerde,
+                                                               final String storeName) {
+        return aggregate(initializer, windows, AbstractStream.windowedStore(keySerde, valueSerde, windows, storeName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                                               final Windows<W> windows,
+                                                               final StateStoreSupplier<WindowStore> storeSupplier) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(windows, "windows can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return (KTable<Windowed<K>, V>) doAggregate(AggregateType.WINDOW_AGGREGATE, initializer, storeSupplier, null, null, windows);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <W extends Window> KTable<K, V> doAggregate(final AggregateType aggregateType,
+                                                        final Initializer<V> initializer,
+                                                        final StateStoreSupplier storeSupplier,
+                                                        final Merger<? super K, V> sessionMerger,
+                                                        final SessionWindows sessionWindows,
+                                                        final Windows<W> windows) {
+        final Set<String> sourceNodes = new HashSet<>();
+        final Collection<KStreamAggProcessorSupplier> processors = new ArrayList<>();
+        final List<String> processorNames = new ArrayList<>();
+        for (final Map.Entry<KGroupedStream, Aggregator> pair : pairs.entrySet()) {
+            final KGroupedStreamImpl groupedStream = (KGroupedStreamImpl) pair.getKey();
+            final String sourceName = repartitionIfRequired(groupedStream);
+            if (sourceName.equals(groupedStream.name)) {
+                sourceNodes.addAll(groupedStream.sourceNodes);
+            } else {
+                sourceNodes.add(sourceName);
+            }
+
+            final KStreamAggProcessorSupplier processor;
+            switch (aggregateType) {
+                case AGGREGATE:
+                    processor = new KStreamAggregate(storeSupplier.name(), initializer, pair.getValue());
+                    break;
+                case SESSION_WINDOW_AGGREGATE:
+                    processor = new KStreamSessionWindowAggregate(sessionWindows, storeSupplier.name(), initializer, pair.getValue(), sessionMerger);
+                    break;
+                case WINDOW_AGGREGATE:
+                    processor = new KStreamWindowAggregate(windows, storeSupplier.name(), initializer, pair.getValue());
+                    break;
+                default:
+                    throw new IllegalStateException("Unrecognized AggregateType.");
+            }
+            processors.add(processor);
+            
+            final String processorName = newName(COGROUP_AGGREGATE_NAME);
+            final String[] sourceNames = {sourceName};
+            builder.internalTopologyBuilder.addProcessor(processorName, processor, sourceNames);
+        }
+        final String name = newName(COGROUP_NAME);
+        final KStreamCogroup cogroup = new KStreamCogroup(processors);
+        final String[] processorNamesArray = processorNames.toArray(new String[processorNames.size()]);
+        builder.internalTopologyBuilder.addProcessor(name, cogroup, processorNamesArray);
+        builder.internalTopologyBuilder.addStateStore(storeSupplier, processorNamesArray);
+        builder.internalTopologyBuilder.copartitionSources(sourceNodes);
+        return new KTableImpl<K, String, V>(builder, name, cogroup, sourceNodes, storeSupplier.name(), true);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private String repartitionIfRequired(KGroupedStreamImpl groupedStream) {
+        if (repartitionNames.containsKey(groupedStream)) {
+            return repartitionNames.get(groupedStream);
+        }
+        final String sourceName = groupedStream.repartitionIfRequired(null);
+        repartitionNames.put(groupedStream, sourceName);
+        return sourceName;
+    }
+    
+    private String newName(String prefix) {
+        return prefix + String.format("%010d", index.getAndIncrement());
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index e4429cc5e88..1528b6ab79a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -46,6 +46,10 @@ public Long apply(K aggKey, V value, Long aggregate) {
         }
     };
 
+    public InternalStreamsBuilder internalStreamsBuilder() {
+        return builder;
+    }
+
     GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
                                   final Serde<K> keySerde,
                                   final Serde<V> valueSerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index fa47444c3c8..6395b833244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -167,10 +168,6 @@ public String newStoreName(final String prefix) {
         return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
     }
 
-    public synchronized void addStateStore(final StoreBuilder builder) {
-        internalTopologyBuilder.addStateStore(builder);
-    }
-
     public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                             final String sourceName,
                                             final String topic,
@@ -204,4 +201,8 @@ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeB
                        processorName,
                        stateUpdateSupplier);
     }
+    
+    public synchronized void addStateStore(StoreBuilder storeBuilder) {
+        internalTopologyBuilder.addStateStore(storeBuilder);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 45ae7da7e0f..9fae052c5d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -192,6 +193,13 @@ private void determineIsQueryable(final String queryableStoreName) {
                            materializedInternal);
 
     }
+    
+    @Override
+    public <VR> CogroupedKStream<K, VR> cogroup(final Aggregator<? super K, ? super V, VR> aggregator) {
+        Objects.requireNonNull(aggregator, "aggregator should not be null");
+        return new CogroupedKStreamImpl<>(builder,
+                                          this, keySerde, aggregator);
+    }
 
     @SuppressWarnings("deprecation")
     @Override
@@ -515,7 +523,7 @@ public V apply(final K aggKey, final V aggOne, final V aggTwo) {
     /**
      * @return the new sourceName if repartitioned. Otherwise the name of this stream
      */
-    private String repartitionIfRequired(final String queryableStoreName) {
+    String repartitionIfRequired(final String queryableStoreName) {
         if (!repartitionRequired) {
             return this.name;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamCogroup.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamCogroup.java
new file mode 100644
index 00000000000..de800749dda
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamCogroup.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+
+class KStreamCogroup<K, V> implements KStreamAggProcessorSupplier<K, K, Change<V>, V> {
+
+    private final List<KStreamAggProcessorSupplier> parents;
+
+    private boolean sendOldValues = false;
+
+    KStreamCogroup(Collection<KStreamAggProcessorSupplier> parents) {
+        this.parents = new ArrayList<>(parents);
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KStreamCogroupProcessor();
+    }
+    
+    private final class KStreamCogroupProcessor extends AbstractProcessor<K, Change<V>> {
+        @Override
+        public void process(K key, Change<V> value) {
+            if (sendOldValues) {
+                context().forward(key, value);
+            } else {
+                context().forward(key, new Change<>(value.newValue, null));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        return (KTableValueGetterSupplier<K, V>) parents.get(0).view();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        for (KStreamAggProcessorSupplier parent : parents) {
+            parent.enableSendingOldValues();
+        }
+        sendOldValues = true;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 739792f1877..bc5985fe305 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -24,7 +24,7 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 
-class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
     private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE;
     private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamCogroupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamCogroupIntegrationTest.java
new file mode 100644
index 00000000000..0eb15a5d740
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamCogroupIntegrationTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IntegrationTest.class})
+public class KStreamCogroupIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+    private static final String APP_ID = "cogroup-integration-test-";
+    private static final String COGROUP_STORE_NAME = "cogroup";
+    private static final String TABLE_STORE_NAME = "table";
+    private static final String INPUT_TOPIC_1 = "input-topic-1-";
+    private static final String INPUT_TOPIC_2 = "input-topic-2-";
+    private static final String INPUT_TOPIC_3 = "input-topic-3-";
+    private static final String OUTPUT_TOPIC = "output-topic-";
+    private static final AtomicInteger TEST_NUMBER = new AtomicInteger();
+    private static final Initializer<String> INITIALIZER = new Initializer<String>() {
+            @Override
+            public String apply() {
+                return "";
+            }
+        };
+    private static final Aggregator<Long, String, String> AGGREGATOR_1 = new Aggregator<Long, String, String>() {
+            @Override
+            public String apply(Long key, String value, String aggregate) {
+                return aggregate + "1" + value;
+            }
+        };
+    private static final Aggregator<Long, String, String> AGGREGATOR_2 = new Aggregator<Long, String, String>() {
+            @Override
+            public String apply(Long key, String value, String aggregate) {
+                return aggregate + "2" + value;
+            }
+        };
+    private static final Aggregator<Long, String, String> AGGREGATOR_3 = new Aggregator<Long, String, String>() {
+            @Override
+            public String apply(Long key, String value, String aggregate) {
+                return aggregate + "3" + value;
+            }
+        };
+    private static final KeyValueMapper<Long, String, Long> GROUP_BY = new KeyValueMapper<Long, String, Long>() {
+            @Override
+            public Long apply(Long key, String value) {
+                return key * 2;
+            }
+        };
+    private static final ValueJoiner<String, String, String> JOINER = new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(String value1, String value2) {
+                return value1 + "+" + value2;
+            }
+        };
+    private static final Merger<Long, String> MERGER = new Merger<Long, String>() {
+            @Override
+            public String apply(Long aggKey, String aggOne, String aggTwo) {
+                return aggOne + aggTwo;
+            }
+        };
+    private static final Properties PRODUCER_CONFIG = new Properties();
+    private static final Properties CONSUMER_CONFIG = new Properties();
+    private static final Properties STREAMS_CONFIG = new Properties();
+    private static final List<Input<Long, String>> INPUTS = Arrays.asList(
+            new Input<>(INPUT_TOPIC_1, 10L, new KeyValue<>(1L, "a")),
+            new Input<>(INPUT_TOPIC_2, 10L, new KeyValue<>(2L, "a")),
+            new Input<>(INPUT_TOPIC_3, 11L, new KeyValue<>(1L, "a")),
+            new Input<>(INPUT_TOPIC_1, 11L, new KeyValue<>(1L, "b")),
+            new Input<>(INPUT_TOPIC_2, 12L, new KeyValue<>(2L, "b")),
+            new Input<>(INPUT_TOPIC_3, 12L, new KeyValue<>(1L, "b")),
+            new Input<>(INPUT_TOPIC_1, 20L, new KeyValue<>(2L, "c")),
+            new Input<>(INPUT_TOPIC_2, 20L, new KeyValue<>(1L, "c")),
+            new Input<>(INPUT_TOPIC_3, 21L, new KeyValue<>(2L, "c")),
+            new Input<>(INPUT_TOPIC_1, 21L, new KeyValue<>(2L, "a")),
+            new Input<>(INPUT_TOPIC_2, 22L, new KeyValue<>(1L, "a")),
+            new Input<>(INPUT_TOPIC_3, 22L, new KeyValue<>(2L, "a")),
+            new Input<>(INPUT_TOPIC_1, 16L, new KeyValue<>(2L, "b")),
+            new Input<>(INPUT_TOPIC_2, 16L, new KeyValue<>(1L, "b")),
+            new Input<>(INPUT_TOPIC_3, 17L, new KeyValue<>(2L, "b")),
+            new Input<>(INPUT_TOPIC_1, 17L, new KeyValue<>(1L, "c")),
+            new Input<>(INPUT_TOPIC_2, 18L, new KeyValue<>(2L, "c")),
+            new Input<>(INPUT_TOPIC_3, 18L, new KeyValue<>(1L, "c"))
+        );
+
+    @BeforeClass
+    public static void setupConfigs() throws Exception {
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+    }
+
+    @Test
+    public void testCogroup() throws InterruptedException, ExecutionException {
+        final int testNumber = TEST_NUMBER.getAndIncrement();
+        final Properties producerConfig = producerConfig();
+        final Properties consumerConfig = consumerConfig("consumer-" + testNumber);
+        CLUSTER.createTopic(INPUT_TOPIC_1 + testNumber, 2, 1);
+        CLUSTER.createTopic(INPUT_TOPIC_2 + testNumber, 2, 1);
+        CLUSTER.createTopic(INPUT_TOPIC_3 + testNumber, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC + testNumber, 2, 1);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        KGroupedStream<Long, String> stream1 = builder.<Long, String>stream(INPUT_TOPIC_1 + testNumber).groupByKey();
+        KGroupedStream<Long, String> stream2 = builder.<Long, String>stream(INPUT_TOPIC_2 + testNumber).groupByKey();
+        KGroupedStream<Long, String> stream3 = builder.<Long, String>stream(INPUT_TOPIC_3 + testNumber).groupByKey();
+        stream1.cogroup(AGGREGATOR_1)
+                .cogroup(stream2, AGGREGATOR_2)
+                .cogroup(stream3, AGGREGATOR_3)
+                .aggregate(INITIALIZER, null, COGROUP_STORE_NAME)
+                .to(OUTPUT_TOPIC + testNumber);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig(APP_ID + testNumber));
+        
+        final List<KeyValue<Long, String>> expecteds = Arrays.asList(
+                KeyValue.pair(1L, "1a"),
+                KeyValue.pair(2L, "2a"),
+                KeyValue.pair(1L, "1a3a"),
+                KeyValue.pair(1L, "1a3a1b"),
+                KeyValue.pair(2L, "2a2b"),
+                KeyValue.pair(1L, "1a3a1b3b"),
+                KeyValue.pair(2L, "2a2b1c"),
+                KeyValue.pair(1L, "1a3a1b3b2c"),
+                KeyValue.pair(2L, "2a2b1c3c"),
+                KeyValue.pair(2L, "2a2b1c3c1a"),
+                KeyValue.pair(1L, "1a3a1b3b2c2a"),
+                KeyValue.pair(2L, "2a2b1c3c1a3a"),
+                KeyValue.pair(2L, "2a2b1c3c1a3a1b"),
+                KeyValue.pair(1L, "1a3a1b3b2c2a2b"),
+                KeyValue.pair(2L, "2a2b1c3c1a3a1b3b"),
+                KeyValue.pair(1L, "1a3a1b3b2c2a2b1c"),
+                KeyValue.pair(2L, "2a2b1c3c1a3a1b3b2c"),
+                KeyValue.pair(1L, "1a3a1b3b2c2a2b1c3c")
+            );
+
+        try {
+            streams.start();
+
+            final Iterator<KeyValue<Long, String>> expectedsIterator = expecteds.iterator();
+            for (final Input<Long, String> input : INPUTS) {
+                IntegrationTestUtils.produceKeyValuesSynchronously(input.topic + testNumber, Collections.singleton(input.keyValue), producerConfig, CLUSTER.time);
+                List<KeyValue<Long, String>> outputs = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC + testNumber, 1);
+                assertThat(outputs.get(0), equalTo(expectedsIterator.next()));
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void testCogroupRepartition() throws InterruptedException, ExecutionException {
+        final int testNumber = TEST_NUMBER.getAndIncrement();
+        final Properties producerConfig = producerConfig();
+        final Properties consumerConfig = consumerConfig("consumer-" + testNumber);
+        CLUSTER.createTopic(INPUT_TOPIC_1 + testNumber, 2, 1);
+        CLUSTER.createTopic(INPUT_TOPIC_2 + testNumber, 2, 1);
+        CLUSTER.createTopic(INPUT_TOPIC_3 + testNumber, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC + testNumber, 2, 1);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        KGroupedStream<Long, String> stream1 = builder.<Long, String>stream(INPUT_TOPIC_1 + testNumber).groupBy(GROUP_BY);
+        KGroupedStream<Long, String> stream2 = builder.<Long, String>stream(INPUT_TOPIC_2 + testNumber).groupBy(GROUP_BY);
+        KGroupedStream<Long, String> stream3 = builder.<Long, String>stream(INPUT_TOPIC_3 + testNumber).groupBy(GROUP_BY);
+        stream1.cogroup(AGGREGATOR_1)
+                .cogroup(stream2, AGGREGATOR_2)
+                .cogroup(stream3, AGGREGATOR_3)
+                .aggregate(INITIALIZER, null, COGROUP_STORE_NAME)
+                .to(OUTPUT_TOPIC + testNumber);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig(APP_ID + testNumber));
+
+        final List<KeyValue<Long, String>> expecteds = Arrays.asList(
+                KeyValue.pair(2L, "1a"), // Key 1
+                KeyValue.pair(4L, "2a"), // Key 2
+                KeyValue.pair(2L, "1a3a"), // Key 1
+                KeyValue.pair(2L, "1a3a1b"), // Key 1
+                KeyValue.pair(4L, "2a2b"), // Key 2
+                KeyValue.pair(2L, "1a3a1b3b"), // Key 1
+                KeyValue.pair(4L, "2a2b1c"), // Key 2
+                KeyValue.pair(2L, "1a3a1b3b2c"), // Key 1
+                KeyValue.pair(4L, "2a2b1c3c"), // Key 2
+                KeyValue.pair(4L, "2a2b1c3c1a"), // Key 2
+                KeyValue.pair(2L, "1a3a1b3b2c2a"), // Key 1
+                KeyValue.pair(4L, "2a2b1c3c1a3a"), // Key 2
+                KeyValue.pair(4L, "2a2b1c3c1a3a1b"), // Key 2
+                KeyValue.pair(2L, "1a3a1b3b2c2a2b"), // Key 1
+                KeyValue.pair(4L, "2a2b1c3c1a3a1b3b"), // Key 2
+                KeyValue.pair(2L, "1a3a1b3b2c2a2b1c"), // Key 1
+                KeyValue.pair(4L, "2a2b1c3c1a3a1b3b2c"), // Key 2
+                KeyValue.pair(2L, "1a3a1b3b2c2a2b1c3c") // Key 1
+            );
+
+        try {
+            streams.start();
+
+            final Iterator<KeyValue<Long, String>> expectedsIterator = expecteds.iterator();
+            for (final Input<Long, String> input : INPUTS) {
+                IntegrationTestUtils.produceKeyValuesSynchronously(input.topic + testNumber, Collections.singleton(input.keyValue), producerConfig, CLUSTER.time);
+                List<KeyValue<Long, String>> outputs = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC + testNumber, 1);
+                assertThat(outputs.get(0), equalTo(expectedsIterator.next()));
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void testCogroupViewAndEnableSendingOldValues() throws InterruptedException, ExecutionException {
+        final int testNumber = TEST_NUMBER.getAndIncrement();
+        final Properties producerConfig = producerConfig();
+        final Properties consumerConfig = consumerConfig("consumer-" + testNumber);
+        CLUSTER.createTopic(INPUT_TOPIC_1 + testNumber, 2, 1);
+        CLUSTER.createTopic(INPUT_TOPIC_2 + testNumber, 2, 1);
+        CLUSTER.createTopic(INPUT_TOPIC_3 + testNumber, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC + testNumber, 2, 1);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        KGroupedStream<Long, String> stream1 = builder.<Long, String>stream(INPUT_TOPIC_1 + testNumber).groupByKey();
+        KGroupedStream<Long, String> stream2 = builder.<Long, String>stream(INPUT_TOPIC_2 + testNumber).groupByKey();
+        KTable<Long, String> table = builder.table(INPUT_TOPIC_3 + testNumber);
+        stream1.cogroup(AGGREGATOR_1)
+                .cogroup(stream2, AGGREGATOR_2)
+                .aggregate(INITIALIZER, null, COGROUP_STORE_NAME)
+                .outerJoin(table, JOINER)
+                .to(OUTPUT_TOPIC + testNumber);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig(APP_ID + testNumber));
+
+        final List<KeyValue<Long, String>> expecteds = Arrays.asList(
+                KeyValue.pair(1L, "1a+null"),
+                KeyValue.pair(2L, "2a+null"),
+                KeyValue.pair(1L, "1a+a"),
+                KeyValue.pair(1L, "1a1b+a"),
+                KeyValue.pair(2L, "2a2b+null"),
+                KeyValue.pair(1L, "1a1b+b"),
+                KeyValue.pair(2L, "2a2b1c+null"),
+                KeyValue.pair(1L, "1a1b2c+b"),
+                KeyValue.pair(2L, "2a2b1c+c"),
+                KeyValue.pair(2L, "2a2b1c1a+c"),
+                KeyValue.pair(1L, "1a1b2c2a+b"),
+                KeyValue.pair(2L, "2a2b1c1a+a"),
+                KeyValue.pair(2L, "2a2b1c1a1b+a"),
+                KeyValue.pair(1L, "1a1b2c2a2b+b"),
+                KeyValue.pair(2L, "2a2b1c1a1b+b"),
+                KeyValue.pair(1L, "1a1b2c2a2b1c+b"),
+                KeyValue.pair(2L, "2a2b1c1a1b2c+b"),
+                KeyValue.pair(1L, "1a1b2c2a2b1c+c")
+            );
+
+        try {
+            streams.start();
+
+            final Iterator<KeyValue<Long, String>> expectedsIterator = expecteds.iterator();
+            for (final Input<Long, String> input : INPUTS) {
+                IntegrationTestUtils.produceKeyValuesSynchronously(input.topic + testNumber, Collections.singleton(input.keyValue), producerConfig, CLUSTER.time);
+                List<KeyValue<Long, String>> outputs = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC + testNumber, 1);
+                assertThat(outputs.get(0), equalTo(expectedsIterator.next()));
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    private static final Properties producerConfig() {
+        return PRODUCER_CONFIG;
+    }
+
+    private static final Properties consumerConfig(final String groupId) {
+        final Properties consumerConfig = new Properties();
+        consumerConfig.putAll(CONSUMER_CONFIG);
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        return consumerConfig;
+    }
+
+    private static final Properties streamsConfig(final String applicationId) {
+        final Properties streamsConfig = new Properties();
+        streamsConfig.putAll(STREAMS_CONFIG);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        return streamsConfig;
+    }
+
+    private static final class Input<K, V> {
+        String topic;
+        long timestamp;
+        KeyValue<K, V> keyValue;
+
+        Input(final String topic, final long timestamp, final KeyValue<K, V> keyValue) {
+            this.topic = topic;
+            this.timestamp = timestamp;
+            this.keyValue = keyValue;
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
new file mode 100644
index 00000000000..d74d73e4a6f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.CogroupedKStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CogroupedKStreamImplTest {
+
+    private static final String TOPIC_1 = "topic-1";
+    private static final String TOPIC_2 = "topic-2";
+    private static final String INVALID_STORE_NAME = "~foo bar~";
+    private final KStreamBuilder builder = new KStreamBuilder();
+    private CogroupedKStream<String, String> cogroupedStream;
+    private KStreamTestDriver driver = null;
+    
+    private final Merger<String, String> sessionMerger = new Merger<String, String>() {
+        @Override
+        public String apply(final String aggKey, final String aggOne, final String aggTwo) {
+            return null;
+        }
+    };
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), TOPIC_1);
+        cogroupedStream = stream.groupByKey(Serdes.String(), Serdes.String()).cogroup(MockAggregator.TOSTRING_ADDER);
+    }
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullKGroupedStreamOnCogroup() throws Exception {
+        cogroupedStream.cogroup(null, MockAggregator.TOSTRING_ADDER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullAggregatorOnCogroup() throws Exception {
+        cogroupedStream.cogroup(builder.stream(Serdes.String(), Serdes.String(), TOPIC_2).groupByKey(), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
+        cogroupedStream.aggregate(null, Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, Serdes.String(), null);
+    }
+
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameOnAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, Serdes.String(), INVALID_STORE_NAME);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(null, TimeWindows.of(10), Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, TimeWindows.of(10), Serdes.String(), null);
+    }
+
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, TimeWindows.of(10), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnSessionWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(null, sessionMerger, SessionWindows.with(10), Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullSessionMergerOnSessionWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, null, SessionWindows.with(10), Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullWindowsOnSessionWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, sessionMerger, null, Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameOnSessionWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, sessionMerger, SessionWindows.with(10), Serdes.String(), null);
+    }
+
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameOnSessionWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, sessionMerger, SessionWindows.with(10), Serdes.String(), INVALID_STORE_NAME);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreSupplierOnSessionWindowedAggregate() throws Exception {
+        cogroupedStream.aggregate(MockInitializer.STRING_INIT, sessionMerger, SessionWindows.with(10), null);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamCogroupTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamCogroupTest.java
new file mode 100644
index 00000000000..b65e2977a95
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamCogroupTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamCogroupTest {
+
+    private boolean sendOldValues = false;
+    private final KTableValueGetterSupplier<String, Long> parentValueGetterSupplier = new KTableValueGetterSupplier<String, Long>() {
+            @Override
+            public KTableValueGetter<String, Long> get() {
+                return null;
+            }
+    
+            @Override
+            public String[] storeNames() {
+                return null;
+            }
+        };
+    private final KStreamAggProcessorSupplier parent = new KStreamAggProcessorSupplier<String, String, Change<Long>, Long>() {
+            @Override
+            public Processor<String, Change<Long>> get() {
+                return null;
+            }
+    
+            @Override
+            public KTableValueGetterSupplier<String, Long> view() {
+                return parentValueGetterSupplier;
+            }
+    
+            @Override
+            public void enableSendingOldValues() {
+                sendOldValues = true;
+            }
+        };
+    private final KStreamCogroup<String, Long> cogroup = new KStreamCogroup<String, Long>(Collections.singleton(parent));
+    private final Processor<String, Change<Long>> processor = cogroup.get();
+    private MockProcessorContext context;
+    private List<KeyValue> results = new ArrayList<>();
+
+    @Before
+    public void setup() {
+        context = new MockProcessorContext(null, Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache"), 100000, new MockStreamsMetrics(new Metrics()))) {
+                @Override
+                public <K, V> void forward(final K key, final V value) {
+                    results.add(KeyValue.pair(key, value));
+                }
+            };
+        processor.init(context);
+    }
+
+    @After
+    public void tearDown() {
+        results.clear();
+        sendOldValues = false;
+    }
+
+    @Test
+    public void shouldEnableSendingOldValuesOfParent() {
+        cogroup.enableSendingOldValues();
+        assertTrue(sendOldValues);
+    }
+
+    @Test
+    public void shouldReturnViewOfParent() {
+        final KTableValueGetterSupplier<String, Long> valueGetterSupplier = cogroup.view();
+        assertEquals(parentValueGetterSupplier, valueGetterSupplier);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPassChangeWithOldValueRemoved() {
+        processor.process("key", new Change<>(1L, 0L));
+        assertEquals(new KeyValue<>("key", new Change<>(1L, null)), (KeyValue<String, Change<Long>>) results.get(0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPassChangeUnchanged() {
+        cogroup.enableSendingOldValues();
+        processor.process("key", new Change<>(1L, 0L));
+        assertEquals(new KeyValue<>("key", new Change<>(1L, 0L)), (KeyValue<String, Change<Long>>) results.get(0));
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka Streams: Add Cogroup in the DSL
> -------------------------------------
>
>                 Key: KAFKA-6049
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6049
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Richard Yu
>            Priority: Major
>              Labels: api, kip, user-experience
>
> When multiple streams aggregate together to form a single larger object (e.g. a shopping website may have a cart stream, a wish list stream, and a purchases stream. Together they make up a Customer), it is very difficult to accommodate this in the Kafka-Streams DSL: it generally requires you to group and aggregate all of the streams to KTables then make multiple outer join calls to end up with a KTable with your desired object. This will create a state store for each stream and a long chain of ValueJoiners that each new record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
> * Reduce the number of gets from state stores. With the multiple joins when a new value comes into any of the streams a chain reaction happens where the join processor keep calling ValueGetters until we have accessed all state stores.
> * Slight performance increase. As described above all ValueGetters are called also causing all ValueJoiners to be called forcing a recalculation of the current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)