You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 18:56:36 UTC

[1/3] kafka git commit: KAFKA-3561: Auto create through topic for KStream aggregation and join

Repository: kafka
Updated Branches:
  refs/heads/trunk 54ba2280f -> 7d9d1cb23


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streams/src/test/resources/log4j.properties b/streams/src/test/resources/log4j.properties
new file mode 100644
index 0000000..69f3d1c
--- /dev/null
+++ b/streams/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=WARN, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=WARN
\ No newline at end of file


[2/3] kafka git commit: KAFKA-3561: Auto create through topic for KStream aggregation and join

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 0c94084..1f9a473 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -38,7 +38,7 @@ public interface PartitionGrouper {
      * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this
      * interface. See {@link DefaultPartitionGrouper} for more information.
      *
-     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics
+     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups()} topic group} id to topics
      * @param metadata Metadata of the consuming cluster
      * @return a map of task ids to groups of partitions
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 1743baf..19440e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -68,6 +68,8 @@ public class TopologyBuilder {
     private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+    private String applicationId;
+
     private Map<Integer, Set<String>> nodeGroups = null;
     private Pattern topicPattern;
 
@@ -601,8 +603,8 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
-        Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
+    public Map<Integer, TopicsInfo> topicGroups() {
+        Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
 
         if (subscriptionUpdates.hasUpdates()) {
@@ -629,6 +631,12 @@ public class TopologyBuilder {
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
                         if (this.internalTopicNames.contains(topic)) {
+                            if (applicationId == null) {
+                                throw new TopologyBuilderException("There are internal topics and"
+                                                                   + " applicationId hasn't been "
+                                                                   + "set. Call setApplicationId "
+                                                                   + "first");
+                            }
                             // prefix the internal topic name with the application id
                             String internalTopic = applicationId + "-" + topic;
                             internalSourceTopics.add(internalTopic);
@@ -681,7 +689,7 @@ public class TopologyBuilder {
     }
 
     private Map<Integer, Set<String>> makeNodeGroups() {
-        HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
+        HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
         HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
         int nodeGroupId = 0;
@@ -739,13 +747,30 @@ public class TopologyBuilder {
             for (String node : nodeNames) {
                 String[] topics = nodeToSourceTopics.get(node);
                 if (topics != null)
-                    copartitionGroup.addAll(Arrays.asList(topics));
+                    copartitionGroup.addAll(convertInternalTopicNames(topics));
             }
             list.add(Collections.unmodifiableSet(copartitionGroup));
         }
         return Collections.unmodifiableList(list);
     }
 
+    private List<String> convertInternalTopicNames(String...topics) {
+        final List<String> topicNames = new ArrayList<>();
+        for (String topic : topics) {
+            if (internalTopicNames.contains(topic)) {
+                if (applicationId == null) {
+                    throw new TopologyBuilderException("there are internal topics "
+                                                       + "and applicationId hasn't been set. Call "
+                                                       + "setApplicationId first");
+                }
+                topicNames.add(applicationId + "-" + topic);
+            } else {
+                topicNames.add(topic);
+            }
+        }
+        return topicNames;
+    }
+
     /**
      * Build the topology for the specified topic group. This is called automatically when passing this builder into the
      * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
@@ -814,10 +839,15 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
      */
-    public Set<String> sourceTopics(String applicationId) {
+    public Set<String> sourceTopics() {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
+                if (applicationId == null) {
+                    throw new TopologyBuilderException("there are internal topics and "
+                                                       + "applicationId is null. Call "
+                                                       + "setApplicationId before sourceTopics");
+                }
                 topics.add(applicationId + "-" + topic);
             } else {
                 topics.add(topic);
@@ -849,4 +879,14 @@ public class TopologyBuilder {
     public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
         this.subscriptionUpdates = subscriptionUpdates;
     }
+
+    /**
+     * Set the applicationId. This is required before calling
+     * {@link #sourceTopics}, {@link #topicGroups} and {@link #copartitionSources}
+     * @param applicationId   the streams applicationId. Should be the same as set by
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
+     */
+    public void setApplicationId(String applicationId) {
+        this.applicationId = applicationId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index eb731be..07f9a1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -81,7 +81,9 @@ public class RecordCollector {
             if (partitions != null)
                 partition = partitioner.partition(record.key(), record.value(), partitions.size());
         }
-        this.producer.send(new ProducerRecord<>(record.topic(), partition, keyBytes, valBytes), callback);
+        this.producer.send(new ProducerRecord<>(record.topic(), partition, record.timestamp(),
+                                                keyBytes,
+                                                valBytes), callback);
     }
 
     public void flush() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index adefab9..4b52511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
@@ -259,7 +260,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         streamThread.builder.updateSubscriptions(subscriptionUpdates);
-        this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
+        this.topicGroups = streamThread.builder.topicGroups();
 
         // ensure the co-partitioning topics within the group have the same number of partitions,
         // and enforce the number of partitions for those internal topics.
@@ -270,14 +271,15 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
             internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics);
         }
-        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
 
-        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata);
 
-        // for those internal source topics that do not have co-partition enforcement,
+        // for all internal source topics
         // set the number of partitions to the maximum of the depending sub-topologies source topics
+        Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
+        Set<String> allInternalTopicNames = new HashSet<>();
         for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             Set<String> internalTopics = entry.getValue().interSourceTopics;
+            allInternalTopicNames.addAll(internalTopics);
             for (String internalTopic : internalTopics) {
                 Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
 
@@ -288,20 +290,41 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
                         if (otherSinkTopics.contains(internalTopic)) {
                             for (String topic : other.getValue().sourceTopics) {
-                                List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
-
-                                if (infos != null && infos.size() > numPartitions)
-                                    numPartitions = infos.size();
+                                Integer partitions = null;
+                                // It is possible the sourceTopic is another internal topic, i.e,
+                                // map().join().join(map())
+                                if (allInternalTopicNames.contains(topic)) {
+                                    Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(topic);
+                                    if (taskIds != null) {
+                                        for (TaskId taskId : taskIds) {
+                                            partitions = taskId.partition;
+                                        }
+                                    }
+                                } else {
+                                    partitions = metadata.partitionCountForTopic(topic);
+                                }
+                                if (partitions != null && partitions > numPartitions) {
+                                    numPartitions = partitions;
+                                }
                             }
                         }
                     }
-
                     internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
+                    for (int partition = 0; partition < numPartitions; partition++) {
+                        internalPartitionInfos.put(new TopicPartition(internalTopic, partition),
+                                                   new PartitionInfo(internalTopic, partition, null, new Node[0], new Node[0]));
+                    }
                 }
             }
         }
 
-        Map<TopicPartition, PartitionInfo> internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
+
+        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
+        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups,
+                             metadata.withPartitions(internalPartitionInfos));
+
+
+        internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
         internalSourceTopicToTaskIds.clear();
 
         Cluster metadataWithInternalTopics = metadata;
@@ -469,10 +492,22 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
+        if (numPartitions == -1) {
+            for (String topic : internalTopics) {
+                if (copartitionGroup.contains(topic)) {
+                    Integer partitions = metadata.partitionCountForTopic(topic);
+                    if (partitions != null && partitions > numPartitions) {
+                        numPartitions = partitions;
+                    }
+                }
+            }
+        }
         // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
         for (String topic : internalTopics) {
-            if (copartitionGroup.contains(topic))
-                internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+            if (copartitionGroup.contains(topic)) {
+                internalSourceTopicToTaskIds
+                    .put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 64127a8..d1ce40f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -161,7 +161,7 @@ public class StreamThread extends Thread {
         this.applicationId = applicationId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics(applicationId);
+        this.sourceTopics = builder.sourceTopics();
         this.topicPattern = builder.sourceTopicPattern();
         this.clientId = clientId;
         this.processId = processId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 809a238..b642b2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -135,12 +134,12 @@ public class InternalTopicIntegrationTest {
                 public Iterable<String> apply(String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
-            }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+            }).groupBy(new KeyValueMapper<String, String, String>() {
                 @Override
-                public KeyValue<String, String> apply(String key, String value) {
-                    return new KeyValue<String, String>(value, value);
+                public String apply(String key, String value) {
+                    return value;
                 }
-            }).countByKey("Counts").toStream();
+            }).count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 9e9d366..ea216f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -205,12 +205,13 @@ public class JoinIntegrationTest {
                 }
             })
             // Compute the total per region by summing the individual click counts per region.
-            .reduceByKey(new Reducer<Long>() {
+            .groupByKey(stringSerde, longSerde)
+            .reduce(new Reducer<Long>() {
                 @Override
                 public Long apply(Long value1, Long value2) {
                     return value1 + value2;
                 }
-            }, stringSerde, longSerde, "ClicksPerRegionUnwindowed");
+            }, "ClicksPerRegionUnwindowed");
 
         // Write the (continuously updating) results to the output topic.
         clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
new file mode 100644
index 0000000..44e92f7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class KGroupedStreamIntegrationTest {
+
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+        new EmbeddedSingleNodeKafkaCluster();
+    private static volatile int testNo = 0;
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String outputTopic;
+    private KGroupedStream<String, String> groupedStream;
+    private Reducer<String> reducer;
+    private Initializer<Integer> initializer;
+    private Aggregator<String, String, Integer> aggregator;
+    private KStream<Integer, String> stream;
+
+
+    @Before
+    public void before() {
+        testNo++;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        String applicationId = "kgrouped-stream-test-" +
+                       testNo;
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kgrouped-stream-test");
+
+        KeyValueMapper<Integer, String, String>
+            mapper =
+            new KeyValueMapper<Integer, String, String>() {
+                @Override
+                public String apply(Integer key, String value) {
+                    return value;
+                }
+            };
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
+        groupedStream = stream
+            .groupBy(
+                mapper,
+                Serdes.String(),
+                Serdes.String());
+
+        reducer = new Reducer<String>() {
+            @Override
+            public String apply(String value1, String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        initializer = new Initializer<Integer>() {
+            @Override
+            public Integer apply() {
+                return 0;
+            }
+        };
+        aggregator = new Aggregator<String, String, Integer>() {
+            @Override
+            public Integer apply(String aggKey, String value, Integer aggregate) {
+                return aggregate + value.length();
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+
+    @Test
+    public void shouldReduce() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream
+            .reduce(reducer, "reduce-by-key")
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, String>> results = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 10);
+
+        Collections.sort(results, new Comparator<KeyValue<String, String>>() {
+            @Override
+            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
+                                             KeyValue.pair("A", "A:A"),
+                                             KeyValue.pair("B", "B"),
+                                             KeyValue.pair("B", "B:B"),
+                                             KeyValue.pair("C", "C"),
+                                             KeyValue.pair("C", "C:C"),
+                                             KeyValue.pair("D", "D"),
+                                             KeyValue.pair("D", "D:D"),
+                                             KeyValue.pair("E", "E"),
+                                             KeyValue.pair("E", "E:E"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
+                                                                            final KeyValue<K, V> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.compareTo(o2.value);
+        }
+        return keyComparison;
+    }
+
+    @Test
+    public void shouldReduceWindowed() throws Exception {
+        long firstBatchTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstBatchTimestamp);
+        long secondBatchTimestamp = System.currentTimeMillis();
+        produceMessages(secondBatchTimestamp);
+        produceMessages(secondBatchTimestamp);
+
+        groupedStream
+            .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
+            .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, String value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, String>> windowedOutput = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 15);
+
+        Comparator<KeyValue<String, String>>
+            comparator =
+            new Comparator<KeyValue<String, String>>() {
+                @Override
+                public int compare(final KeyValue<String, String> o1,
+                                   final KeyValue<String, String> o2) {
+                    return KGroupedStreamIntegrationTest.compare(o1, o2);
+                }
+            };
+
+        Collections.sort(windowedOutput, comparator);
+        long firstBatchWindow = firstBatchTimestamp / 500 * 500;
+        long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+
+        assertThat(windowedOutput, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
+                new KeyValue<>("B@" + firstBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
+                new KeyValue<>("C@" + firstBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
+                new KeyValue<>("D@" + firstBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
+                new KeyValue<>("E@" + firstBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E:E")
+            )
+        ));
+    }
+
+    @Test
+    public void shouldAggregate() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream.aggregate(
+            initializer,
+            aggregator,
+            Serdes.Integer(),
+            "aggregate-by-selected-key")
+            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, Integer>> results = receiveMessages(
+            new StringDeserializer(),
+            new IntegerDeserializer()
+            , 10);
+
+        Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
+            @Override
+            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", 1),
+            KeyValue.pair("A", 2),
+            KeyValue.pair("B", 1),
+            KeyValue.pair("B", 2),
+            KeyValue.pair("C", 1),
+            KeyValue.pair("C", 2),
+            KeyValue.pair("D", 1),
+            KeyValue.pair("D", 2),
+            KeyValue.pair("E", 1),
+            KeyValue.pair("E", 2)
+        )));
+    }
+
+    @Test
+    public void shouldAggregateWindowed() throws Exception {
+        long firstTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstTimestamp);
+        long secondTimestamp = System.currentTimeMillis();
+        produceMessages(secondTimestamp);
+        produceMessages(secondTimestamp);
+
+        groupedStream.aggregate(
+            initializer,
+            aggregator,
+            TimeWindows.of("aggregate-by-key-windowed", 500L),
+            Serdes.Integer())
+            .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, Integer value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+            new StringDeserializer(),
+            new IntegerDeserializer()
+            , 15);
+
+        Comparator<KeyValue<String, Integer>>
+            comparator =
+            new Comparator<KeyValue<String, Integer>>() {
+                @Override
+                public int compare(final KeyValue<String, Integer> o1,
+                                   final KeyValue<String, Integer> o2) {
+                    return KGroupedStreamIntegrationTest.compare(o1, o2);
+                }
+            };
+
+        Collections.sort(windowedMessages, comparator);
+
+        long firstWindow = firstTimestamp / 500 * 500;
+        long secondWindow = secondTimestamp / 500 * 500;
+
+        assertThat(windowedMessages, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstWindow, 1),
+                new KeyValue<>("A@" + secondWindow, 1),
+                new KeyValue<>("A@" + secondWindow, 2),
+                new KeyValue<>("B@" + firstWindow, 1),
+                new KeyValue<>("B@" + secondWindow, 1),
+                new KeyValue<>("B@" + secondWindow, 2),
+                new KeyValue<>("C@" + firstWindow, 1),
+                new KeyValue<>("C@" + secondWindow, 1),
+                new KeyValue<>("C@" + secondWindow, 2),
+                new KeyValue<>("D@" + firstWindow, 1),
+                new KeyValue<>("D@" + secondWindow, 1),
+                new KeyValue<>("D@" + secondWindow, 2),
+                new KeyValue<>("E@" + firstWindow, 1),
+                new KeyValue<>("E@" + secondWindow, 1),
+                new KeyValue<>("E@" + secondWindow, 2)
+            )));
+    }
+
+    @Test
+    public void shouldCount() throws Exception {
+        produceMessages(System.currentTimeMillis());
+
+        groupedStream.count("count-by-key")
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, Long>> results = receiveMessages(
+            new StringDeserializer(),
+            new LongDeserializer()
+            , 10);
+        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", 1L),
+            KeyValue.pair("A", 2L),
+            KeyValue.pair("B", 1L),
+            KeyValue.pair("B", 2L),
+            KeyValue.pair("C", 1L),
+            KeyValue.pair("C", 2L),
+            KeyValue.pair("D", 1L),
+            KeyValue.pair("D", 2L),
+            KeyValue.pair("E", 1L),
+            KeyValue.pair("E", 2L)
+        )));
+    }
+
+    @Test
+    public void shouldGroupByKey() throws Exception {
+        long timestamp = System.currentTimeMillis();
+        produceMessages(timestamp);
+        produceMessages(timestamp);
+
+        stream.groupByKey(Serdes.Integer(), Serdes.String())
+            .count(TimeWindows.of("count-windows", 500L))
+            .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
+                @Override
+                public String apply(final Windowed<Integer> windowedKey, final Long value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            }).to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, Long>> results = receiveMessages(
+            new StringDeserializer(),
+            new LongDeserializer()
+            , 10);
+        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        long window = timestamp / 500 * 500;
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("1@" + window, 1L),
+            KeyValue.pair("1@" + window, 2L),
+            KeyValue.pair("2@" + window, 1L),
+            KeyValue.pair("2@" + window, 2L),
+            KeyValue.pair("3@" + window, 1L),
+            KeyValue.pair("3@" + window, 2L),
+            KeyValue.pair("4@" + window, 1L),
+            KeyValue.pair("4@" + window, 2L),
+            KeyValue.pair("5@" + window, 1L),
+            KeyValue.pair("5@" + window, 2L)
+        )));
+
+    }
+
+
+    private void produceMessages(long timestamp)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            streamOneInput,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, "C"),
+                new KeyValue<>(4, "D"),
+                new KeyValue<>(5, "E")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            timestamp);
+    }
+
+
+    private void createTopics() {
+        streamOneInput = "stream-one-" + testNo;
+        outputTopic = "output-" + testNo;
+        CLUSTER.createTopic(streamOneInput, 3, 1);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+                                                            keyDeserializer,
+                                                        final Deserializer<V>
+                                                            valueDeserializer,
+                                                        final int numMessages)
+        throws InterruptedException {
+        final Properties consumerProperties = new Properties();
+        consumerProperties
+            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
+                                                                       testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
+                                                                        outputTopic,
+                                                                        numMessages, 60 * 1000);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
new file mode 100644
index 0000000..221d349
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class KStreamRepartitionJoinTest {
+
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+        new EmbeddedSingleNodeKafkaCluster();
+
+    private static volatile int testNo = 0;
+
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KStream<Long, Integer> streamOne;
+    private KStream<Integer, String> streamTwo;
+    private KStream<Integer, Integer> streamThree;
+    private KStream<Integer, String> streamFour;
+    private KTable<Integer, String> kTable;
+    private ValueJoiner<Integer, String, String> valueJoiner;
+    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>
+        keyMapper;
+
+    private final List<String>
+        expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String streamTwoInput;
+    private String streamFourInput;
+    private String tableInput;
+    private String outputTopic;
+    private String streamThreeInput;
+
+
+
+    @Before
+    public void before() {
+        testNo++;
+        String applicationId = "kstream-repartition-join-test" + testNo;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
+                                 applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstream-repartition-test");
+
+        streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
+        streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
+        streamThree = builder.stream(Serdes.Integer(), Serdes.Integer(), streamThreeInput);
+        streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
+
+        kTable = builder.table(Serdes.Integer(), Serdes.String(), tableInput);
+
+        valueJoiner = new ValueJoiner<Integer, String, String>() {
+            @Override
+            public String apply(final Integer value1, final String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+
+        keyMapper = new KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>() {
+            @Override
+            public KeyValue<Integer, Integer> apply(final Long key, final Integer value) {
+                return new KeyValue<>(value, value);
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldMapStreamOneAndJoin() throws ExecutionException, InterruptedException {
+        produceMessages();
+        doJoin(streamOne.map(keyMapper), streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    @Test
+    public void shouldMapBothStreamsAndJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            map1 =
+            streamOne.map(keyMapper);
+
+        final KStream<Integer, String> map2 = streamTwo.map(
+            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+                @Override
+                public KeyValue<Integer, String> apply(Integer key,
+                                                       String value) {
+                    return new KeyValue<>(key, value);
+                }
+            });
+
+        doJoin(map1, map2);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+
+    }
+
+    @Test
+    public void shouldMapMapJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer> mapMapStream = streamOne.map(
+            new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
+                @Override
+                public KeyValue<Long, Integer> apply(Long key, Integer value) {
+                    return new KeyValue<>(key + value, value);
+                }
+            }).map(keyMapper);
+
+        doJoin(mapMapStream, streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+
+    @Test
+    public void shouldSelectKeyAndJoin() throws ExecutionException, InterruptedException {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            keySelected =
+            streamOne.selectKey(new KeyValueMapper<Long, Integer, Integer>() {
+                @Override
+                public Integer apply(final Long key, final Integer value) {
+                    return value;
+                }
+            });
+
+        doJoin(keySelected, streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+
+    @Test
+    public void shouldFlatMapJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
+            new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
+                @Override
+                public Iterable<KeyValue<Integer, Integer>> apply(Long key,
+                                                                  Integer value) {
+                    return Collections.singletonList(new KeyValue<>(value, value));
+                }
+            });
+
+        doJoin(flatMapped, streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    @Test
+    public void shouldJoinTwoStreamsPartitionedTheSame() throws Exception {
+        produceMessages();
+        doJoin(streamThree, streamTwo);
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E"));
+    }
+
+    @Test
+    public void shouldJoinWithRhsStreamMapped() throws Exception {
+        produceMessages();
+
+        ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
+            @Override
+            public String apply(String value1, Integer value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        streamTwo
+            .join(streamOne.map(keyMapper),
+                  joiner,
+                  JoinWindows.of("the-join").within(60 * 1000),
+                  Serdes.Integer(),
+                  Serdes.String(),
+                  Serdes.Integer())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"));
+    }
+
+    @Test
+    public void shouldLeftJoinTwoStreamsPartitionedTheSame() throws Exception {
+        produceMessages();
+        doLeftJoin(streamThree, streamTwo);
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E"));
+    }
+
+    @Test
+    public void shouldMapStreamOneAndLeftJoin() throws ExecutionException, InterruptedException {
+        produceMessages();
+        doLeftJoin(streamOne.map(keyMapper), streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    @Test
+    public void shouldMapBothStreamsAndLeftJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            map1 =
+            streamOne.map(keyMapper);
+
+        final KStream<Integer, String> map2 = streamTwo.map(
+            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+                @Override
+                public KeyValue<Integer, String> apply(Integer key,
+                                                       String value) {
+                    return new KeyValue<>(key, value);
+                }
+            });
+
+        doLeftJoin(map1, map2);
+        startStreams();
+
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+
+        if (!received.equals(expectedStreamOneTwoJoin)) {
+            produceToStreamOne();
+            verifyCorrectOutput(expectedStreamOneTwoJoin);
+        }
+
+    }
+
+    @Test
+    public void shouldLeftJoinWithRhsStreamMapped() throws Exception {
+        produceMessages();
+
+        ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
+            @Override
+            public String apply(String value1, Integer value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        streamTwo
+            .leftJoin(streamOne.map(keyMapper),
+                      joiner,
+                      JoinWindows.of("the-join").within(60 * 1000),
+                      Serdes.Integer(),
+                      null,
+                      Serdes.Integer())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+
+        List<String> expectedMessages = Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5");
+        if (!received.equals(expectedMessages)) {
+            produceStreamTwoInputTo(streamTwoInput);
+            verifyCorrectOutput(expectedMessages);
+        }
+    }
+
+    @Test
+    public void shouldLeftJoinWithKTableAfterMap() throws Exception {
+        produceMessages();
+        streamOne.map(keyMapper)
+            .leftJoin(kTable, valueJoiner, Serdes.Integer(), Serdes.Integer())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+        assertThat(received, is(expectedStreamOneTwoJoin));
+    }
+
+    @Test
+    public void shouldLeftJoinWithTableProducedFromGroupBy() throws Exception {
+        produceMessages();
+        KTable<Integer, String> aggTable =
+            streamOne.map(keyMapper)
+                .groupByKey(Serdes.Integer(), Serdes.Integer())
+                .aggregate(new Initializer<String>() {
+                    @Override
+                    public String apply() {
+                        return "";
+                    }
+                }, new Aggregator<Integer, Integer, String>() {
+                    @Override
+                    public String apply(final Integer aggKey, final Integer value,
+                                        final String aggregate) {
+                        return aggregate + ":" + value;
+                    }
+                }, Serdes.String(), "agg-by-key");
+
+        streamTwo.leftJoin(aggTable, new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(final String value1, final String value2) {
+                return value1 + "@" + value2;
+            }
+        }, Serdes.Integer(), Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        receiveMessages(new StringDeserializer(), 5);
+        produceStreamTwoInputTo(streamTwoInput);
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+
+        assertThat(received, is(Arrays.asList("A@:1", "B@:2", "C@:3", "D@:4", "E@:5")));
+
+    }
+
+
+    @Test
+    public void shouldJoinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            map1 =
+            streamOne.map(keyMapper);
+
+        final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
+            kvMapper =
+            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+                @Override
+                public KeyValue<Integer, String> apply(Integer key,
+                                                       String value) {
+                    return new KeyValue<>(key, value);
+                }
+            };
+
+        final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
+
+        final KStream<Integer, String> join = map1.join(map2,
+                                                        valueJoiner,
+                                                        JoinWindows.of("the-join")
+                                                            .within(60 * 1000),
+                                                        Serdes.Integer(),
+                                                        Serdes.Integer(),
+                                                        Serdes.String());
+
+        ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(final String value1, final String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        join.map(kvMapper)
+            .join(streamFour.map(kvMapper),
+                  joiner,
+                  JoinWindows.of("the-other-join").within(60 * 1000),
+                  Serdes.Integer(),
+                  Serdes.String(),
+                  Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"));
+    }
+
+    @Test
+    public void shouldFilterNullKeysWhenRepartionedOnJoin() throws Exception {
+        produceMessages();
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOneInput,
+            Collections.singleton(
+                new KeyValue<Long, Integer>(70L, null)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                LongSerializer.class,
+                IntegerSerializer.class,
+                new Properties()));
+
+        doJoin(streamOne.map(keyMapper), streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    private void produceMessages()
+        throws ExecutionException, InterruptedException {
+        produceToStreamOne();
+        produceStreamTwoInputTo(streamTwoInput);
+        produceToStreamThree();
+        produceStreamTwoInputTo(tableInput);
+        produceStreamTwoInputTo(streamFourInput);
+
+    }
+
+    private void produceToStreamThree()
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamThreeInput,
+            Arrays.asList(
+                new KeyValue<>(1, 10),
+                new KeyValue<>(2, 20),
+                new KeyValue<>(3, 30),
+                new KeyValue<>(4, 40),
+                new KeyValue<>(5, 50)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()));
+    }
+
+    private void produceStreamTwoInputTo(final String streamTwoInput)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamTwoInput,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, "C"),
+                new KeyValue<>(4, "D"),
+                new KeyValue<>(5, "E")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()));
+    }
+
+    private void produceToStreamOne()
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOneInput,
+            Arrays.asList(
+                new KeyValue<>(10L, 1),
+                new KeyValue<>(5L, 2),
+                new KeyValue<>(12L, 3),
+                new KeyValue<>(15L, 4),
+                new KeyValue<>(20L, 5)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                LongSerializer.class,
+                IntegerSerializer.class,
+                new Properties()));
+    }
+
+    private void createTopics() {
+        streamOneInput = "stream-one-" + testNo;
+        streamTwoInput = "stream-two-" + testNo;
+        streamThreeInput = "stream-three-" + testNo;
+        streamFourInput = "stream-four-" + testNo;
+        tableInput = "table-stream-two-" + testNo;
+        outputTopic = "output-" + testNo;
+        CLUSTER.createTopic(streamOneInput);
+        CLUSTER.createTopic(streamTwoInput, 2, 1);
+        CLUSTER.createTopic(streamThreeInput, 2, 1);
+        CLUSTER.createTopic(streamFourInput);
+        CLUSTER.createTopic(tableInput, 2, 1);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+
+    private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
+                                         final int numMessages) throws InterruptedException {
+
+        final Properties config = new Properties();
+
+        config
+            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test-" + testNo);
+        config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       IntegerDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       valueDeserializer.getClass().getName());
+        List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
+                                                                                      outputTopic,
+                                                                                      numMessages,
+                                                                                      60 *
+                                                                                      1000);
+        Collections.sort(received);
+        return received;
+    }
+
+    private void verifyCorrectOutput(List<String> expectedMessages) throws InterruptedException {
+        assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size()),
+                   is(expectedMessages));
+    }
+
+    private void doJoin(KStream<Integer, Integer> lhs,
+                        KStream<Integer, String> rhs) {
+        lhs.join(rhs,
+                 valueJoiner,
+                 JoinWindows.of("the-join").within(60 * 1000),
+                 Serdes.Integer(),
+                 Serdes.Integer(),
+                 Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+    }
+
+    private void doLeftJoin(KStream<Integer, Integer> lhs,
+                            KStream<Integer, String> rhs) {
+        lhs.leftJoin(rhs,
+                     valueJoiner,
+                     JoinWindows.of("the-join").within(60 * 1000),
+                     Serdes.Integer(),
+                     Serdes.Integer(),
+                     Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index e00cd13..2966590 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -36,6 +36,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
@@ -56,7 +58,7 @@ public class WordCountIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC, 2, 1);
         CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
     }
 
@@ -65,9 +67,9 @@ public class WordCountIntegrationTest {
         List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
         List<KeyValue<String, Long>> expectedWordCounts = Arrays.asList(
             new KeyValue<>("hello", 1L),
+            new KeyValue<>("hello", 2L),
             new KeyValue<>("world", 1L),
             new KeyValue<>("world", 2L),
-            new KeyValue<>("hello", 2L),
             new KeyValue<>("world", 3L)
         );
 
@@ -101,12 +103,12 @@ public class WordCountIntegrationTest {
                 public Iterable<String> apply(String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
-            }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+            }).groupBy(new KeyValueMapper<String, String, String>() {
                 @Override
-                public KeyValue<String, String> apply(String key, String value) {
-                    return new KeyValue<String, String>(value, value);
+                public String apply(final String key, final String value) {
+                    return value;
                 }
-            }).countByKey("Counts")
+            }).count("Counts")
             .toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
@@ -139,6 +141,16 @@ public class WordCountIntegrationTest {
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
         List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
             DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size());
+        Collections.sort(actualWordCounts, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
+                int keyComparison = o1.key.compareTo(o2.key);
+                if (keyComparison == 0) {
+                    return o1.value.compareTo(o2.value);
+                }
+                return keyComparison;
+            }
+        });
         streams.close();
         assertThat(actualWordCounts, equalTo(expectedWordCounts));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index c3f9089..83b431c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -134,10 +134,21 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronously(
         String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
         throws ExecutionException, InterruptedException {
+        produceKeyValuesSynchronouslyWithTimestamp(topic,
+                                                   records,
+                                                   producerConfig,
+                                                   null);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic,
+                                                                         Collection<KeyValue<K, V>> records,
+                                                                         Properties producerConfig,
+                                                                         Long timestamp)
+        throws ExecutionException, InterruptedException {
         Producer<K, V> producer = new KafkaProducer<>(producerConfig);
         for (KeyValue<K, V> record : records) {
             Future<RecordMetadata> f = producer.send(
-                new ProducerRecord<>(topic, record.key, record.value));
+                new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
             f.get();
         }
         producer.flush();
@@ -226,4 +237,5 @@ public class IntegrationTestUtils {
             Thread.sleep(Math.min(waitTime, 100L));
         }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 65a4b54..1a608a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -78,7 +78,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test")
+            .within(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -156,7 +157,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test")
+            .within(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 2c6108b..8bc9a77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -17,14 +17,12 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -144,19 +142,5 @@ public class KStreamKTableLeftJoinTest {
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
     }
 
-    @Test(expected = KafkaException.class)
-    public void testNotJoinable() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<Integer, String> stream;
-        KTable<Integer, String> table;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, stringSerde, topic1).map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
-        table = builder.table(intSerde, stringSerde, topic2);
-
-        stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index f4fe3a6..db533e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -71,10 +71,12 @@ public class KStreamWindowAggregateTest {
 
             KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
             KTable<Windowed<String>, String> table2 =
-                stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                    TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
-                    strSerde,
-                    strSerde);
+                stream1.groupByKey(strSerde,
+                                   strSerde)
+                    .aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.STRING_ADDER,
+                               TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+                               strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);
@@ -149,20 +151,22 @@ public class KStreamWindowAggregateTest {
 
             KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
             KTable<Windowed<String>, String> table1 =
-                stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                    TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
-                    strSerde,
-                    strSerde);
+                stream1.groupByKey(strSerde, strSerde)
+                    .aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.STRING_ADDER,
+                               TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+                               strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
             table1.toStream().process(proc1);
 
             KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
             KTable<Windowed<String>, String> table2 =
-                stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                    TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
-                    strSerde,
-                    strSerde);
+                stream2.groupByKey(strSerde, strSerde)
+                    .aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.STRING_ADDER,
+                               TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
+                               strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 28acf09..107d832 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -139,7 +139,7 @@ public class TopologyBuilderTest {
     @Test
     public void testSourceTopics() {
         final TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId("X");
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
@@ -150,7 +150,7 @@ public class TopologyBuilderTest {
         expected.add("topic-2");
         expected.add("X-topic-3");
 
-        assertEquals(expected, builder.sourceTopics("X"));
+        assertEquals(expected, builder.sourceTopics());
     }
 
     @Test
@@ -259,7 +259,7 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
@@ -277,7 +277,7 @@ public class TopologyBuilderTest {
     @Test
     public void testTopicGroupsByStateStore() {
         final TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId("X");
         builder.addSource("source-1", "topic-1", "topic-1x");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
@@ -297,7 +297,7 @@ public class TopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17bda54..4f7037c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -269,9 +269,9 @@ public class StreamPartitionAssignorTest {
     @Test
     public void testAssignWithStates() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
-
+        String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId(applicationId);
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
 
@@ -295,10 +295,11 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -474,8 +475,9 @@ public class StreamPartitionAssignorTest {
     @Test
     public void testAssignWithInternalTopics() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
-
+        String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
         builder.addSource("source1", "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
@@ -489,10 +491,11 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
@@ -501,13 +504,55 @@ public class StreamPartitionAssignorTest {
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
         assertEquals(1, internalTopicManager.readyTopics.size());
         assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
     }
 
+    @Test
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+        String applicationId = "test";
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
+        builder.addInternalTopic("topicX");
+        builder.addSource("source1", "topic1");
+        builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
+        builder.addSink("sink1", "topicX", "processor1");
+        builder.addSource("source2", "topicX");
+        builder.addInternalTopic("topicZ");
+        builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
+        builder.addSink("sink2", "topicZ", "processor2");
+        builder.addSource("source3", "topicZ");
+        List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        String client1 = "client1";
+
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
+        partitionAssignor.setInternalTopicManager(internalTopicManager);
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put("consumer10",
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check prepared internal topics
+        assertEquals(2, internalTopicManager.readyTopics.size());
+        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ"));
+    }
+
     private class MockInternalTopicManager extends InternalTopicManager {
 
         public Map<String, Integer> readyTopics = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index fbe7754..1e1e3f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -106,7 +107,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
 
         // min
-        data.aggregateByKey(
+        KGroupedStream<String, Integer>
+            groupedData =
+            data.groupByKey(stringSerde, intSerde);
+
+        groupedData.aggregate(
                 new Initializer<Integer>() {
                     public Integer apply() {
                         return Integer.MAX_VALUE;
@@ -119,7 +124,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("uwin-min"),
-                stringSerde,
                 intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
@@ -129,7 +133,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
 
         // max
-        data.aggregateByKey(
+        groupedData.aggregate(
                 new Initializer<Integer>() {
                     public Integer apply() {
                         return Integer.MIN_VALUE;
@@ -142,7 +146,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("uwin-max"),
-                stringSerde,
                 intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
@@ -152,7 +155,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
 
         // sum
-        data.aggregateByKey(
+        groupedData.aggregate(
                 new Initializer<Long>() {
                     public Long apply() {
                         return 0L;
@@ -165,7 +168,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("win-sum"),
-                stringSerde,
                 longSerde
         ).toStream().map(
                 new Unwindow<String, Long>()
@@ -176,10 +178,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
 
         // cnt
-        data.countByKey(
-                UnlimitedWindows.of("uwin-cnt"),
-                stringSerde
-        ).toStream().map(
+        groupedData.count(UnlimitedWindows.of("uwin-cnt"))
+            .toStream().map(
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "cnt");
 
@@ -206,10 +206,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, doubleSerde, "avg");
 
         // windowed count
-        data.countByKey(
-                TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE),
-                stringSerde
-        ).toStream().map(
+        groupedData.count(TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE))
+            .toStream().map(
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                     @Override
                     public KeyValue<String, Long> apply(Windowed<String> key, Long value) {


[3/3] kafka git commit: KAFKA-3561: Auto create through topic for KStream aggregation and join

Posted by gu...@apache.org.
KAFKA-3561: Auto create through topic for KStream aggregation and join

guozhangwang enothereska mjsax miguno

If you get a chance can you please take a look at this. I've done the repartitioning in the join, but it results in 2 internal topics for each join. This seems like overkill as sometimes we wouldn't need to repartition at all, others just 1 topic, and then sometimes both, but I'm not sure how we can know that.

I'd also need to implement something similar for leftJoin, but again, i'd like to see if i'm heading down the right path or if anyone has any other bright ideas.

For reference - https://github.com/apache/kafka/pull/1453 - the previous PR

Thanks for taking the time and looking forward to getting some welcome advice :-)

Author: Damian Guy <da...@gmail.com>
Author: Damian Guy <da...@continuum.local>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1472 from dguy/KAFKA-3561


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d9d1cb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d9d1cb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d9d1cb2

Branch: refs/heads/trunk
Commit: 7d9d1cb2355e33270703280ed6bb712033b03d26
Parents: 54ba228
Author: Damian Guy <da...@gmail.com>
Authored: Thu Jun 16 11:56:32 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 16 11:56:32 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   2 +-
 .../java/org/apache/kafka/test/TestUtils.java   |  15 +
 .../examples/pageview/PageViewTypedDemo.java    |  12 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   3 +-
 .../examples/wordcount/WordCountDemo.java       |   3 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   2 +
 .../kafka/streams/kstream/KGroupedStream.java   | 119 ++++
 .../apache/kafka/streams/kstream/KStream.java   | 215 ++-----
 .../kafka/streams/kstream/KStreamBuilder.java   |   4 +-
 .../kstream/internals/KGroupedStreamImpl.java   | 180 ++++++
 .../kstream/internals/KGroupedTableImpl.java    |   4 +-
 .../kstream/internals/KStreamAggregate.java     |   8 +-
 .../streams/kstream/internals/KStreamImpl.java  | 477 ++++++++--------
 .../kstream/internals/KStreamKStreamJoin.java   |   8 +-
 .../kstream/internals/KStreamReduce.java        |   9 +-
 .../kstream/internals/KStreamWindowReduce.java  |   2 +-
 .../streams/kstream/internals/KTableImpl.java   |   2 +-
 .../streams/processor/PartitionGrouper.java     |   2 +-
 .../streams/processor/TopologyBuilder.java      |  56 +-
 .../processor/internals/RecordCollector.java    |   4 +-
 .../internals/StreamPartitionAssignor.java      |  59 +-
 .../processor/internals/StreamThread.java       |   2 +-
 .../InternalTopicIntegrationTest.java           |   9 +-
 .../integration/JoinIntegrationTest.java        |   5 +-
 .../KGroupedStreamIntegrationTest.java          | 472 ++++++++++++++++
 .../integration/KStreamRepartitionJoinTest.java | 565 +++++++++++++++++++
 .../integration/WordCountIntegrationTest.java   |  24 +-
 .../integration/utils/IntegrationTestUtils.java |  14 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |   6 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  16 -
 .../internals/KStreamWindowAggregateTest.java   |  28 +-
 .../streams/processor/TopologyBuilderTest.java  |  10 +-
 .../internals/StreamPartitionAssignorTest.java  |  61 +-
 .../streams/smoketest/SmokeTestClient.java      |  26 +-
 streams/src/test/resources/log4j.properties     |  21 +
 35 files changed, 1944 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f52cce..9a099d0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -148,7 +148,7 @@
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
-      <allow pkg="org.hamcrest.CoreMatchers" />
+      <allow pkg="org.hamcrest" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 742d14f..a818d53 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -163,4 +165,17 @@ public class TestUtils {
         return memoryRecords.buffer();
     }
 
+    public static Properties producerConfig(final String bootstrapServers,
+                                            final Class keySerializer,
+                                            final Class valueSerializer,
+                                            final Properties additional) {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        properties.put(ProducerConfig.ACKS_CONFIG, "all");
+        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+        properties.putAll(additional);
+        return properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index e53b037..19391d8 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -131,9 +131,16 @@ public class PageViewTypedDemo {
         final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>();
         serdeProps.put("JsonPOJOClass", RegionCount.class);
         regionCountDeserializer.configure(serdeProps, false);
-
         final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);
 
+        final Serializer<PageViewByRegion> pageViewByRegionSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
+        pageViewByRegionSerializer.configure(serdeProps, false);
+        final Deserializer<PageViewByRegion> pageViewByRegionDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
+        pageViewByRegionDeserializer.configure(serdeProps, false);
+        final Serde<PageViewByRegion> pageViewByRegionSerde = Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
+
         KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input");
 
         KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, "streams-userprofile-input");
@@ -160,7 +167,8 @@ public class PageViewTypedDemo {
                         return new KeyValue<>(viewRegion.region, viewRegion);
                     }
                 })
-                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
+                .groupByKey(Serdes.String(), pageViewByRegionSerde)
+                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 8a0af6c..e9aa467 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -99,7 +99,8 @@ public class PageViewUntypedDemo {
                         return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
                     }
                 })
-                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
+                .groupByKey(Serdes.String(), jsonSerde)
+                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 12395f9..bf1d8cb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -72,7 +72,8 @@ public class WordCountDemo {
                         return new KeyValue<>(value, value);
                     }
                 })
-                .countByKey("Counts");
+                .groupByKey()
+                .count("Counts");
 
         // need to override value serde to Long type
         counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f05b02c..6605335 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -134,6 +134,8 @@ public class KafkaStreams {
         // The application ID is a required config and hence should always have value
         String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
+        builder.setApplicationId(applicationId);
+
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
new file mode 100644
index 0000000..25fdb3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
+ * usually grouped on a different key than the original stream key
+ *
+ * <p>
+ * It is an intermediate representation of a {@link KStream} before an
+ * aggregation is applied to the new partitions resulting in a new {@link KTable}.
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ *
+ * @see KStream
+ */
+@InterfaceStability.Unstable
+public interface KGroupedStream<K, V> {
+
+
+    /**
+     * Combine values of this stream by the grouped key into a new instance of ever-updating
+     * {@link KTable}.
+     *
+     * @param reducer           the instance of {@link Reducer}
+     * @param name              the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(Reducer<V> reducer,
+                        String name);
+
+
+    /**
+     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     *
+     * @param reducer           the instance of {@link Reducer}
+     * @param windows           the specification of the aggregation {@link Windows}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                     Windows<W> windows);
+
+    /**
+     * Aggregate values of this stream by key into a new instance of a {@link KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> aggregator,
+                               Serde<T> aggValueSerde,
+                               String name);
+
+    /**
+     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
+                                                           Aggregator<K, V, T> aggregator,
+                                                           Windows<W> windows,
+                                                           Serde<T> aggValueSerde);
+
+
+    /**
+     * Count number of records of this stream by key into a new instance of a {@link KTable}
+     *
+     * @param name  the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count(String name);
+
+
+    /**
+     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     *
+     * @param windows   the specification of the aggregation {@link Windows}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a1ecfa4..ae743b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -317,6 +317,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -343,7 +344,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -361,6 +362,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -387,7 +389,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -405,12 +407,15 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
      * @param windows           the specification of the {@link JoinWindows}
      * @param keySerde          key serdes for materializing the other stream,
      *                          if not specified the default serdes defined in the configs will be used
+     * @param thisValSerde    value serdes for materializing this stream,
+     *                          if not specified the default serdes defined in the configs will be used
      * @param otherValueSerde   value serdes for materializing the other stream,
      *                          if not specified the default serdes defined in the configs will be used
      * @param <V1>              the value type of the other stream
@@ -424,11 +429,12 @@ public interface KStream<K, V> {
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serde<K> keySerde,
+            Serde<V> thisValSerde,
             Serde<V1> otherValueSerde);
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -446,6 +452,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param table     the instance of {@link KTable} joined with this stream
      * @param joiner    the instance of {@link ValueJoiner}
@@ -458,182 +465,76 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
 
     /**
-     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     *
-     * @param reducer           the instance of {@link Reducer}
-     * @param windows           the specification of the aggregation {@link Windows}
-     * @param keySerde          key serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) aggregate for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                          Windows<W> windows,
-                                                          Serde<K> keySerde,
-                                                          Serde<V> valueSerde);
-
-    /**
-     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param reducer the instance of {@link Reducer}
-     * @param windows the specification of the aggregation {@link Windows}
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) aggregate for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows);
-
-    /**
-     * Combine values of this stream by key into a new instance of ever-updating {@link KTable}.
-     *
-     * @param reducer           the instance of {@link Reducer}
-     * @param keySerde          key serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param name              the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
-     */
-    KTable<K, V> reduceByKey(Reducer<V> reducer,
-                             Serde<K> keySerde,
-                             Serde<V> valueSerde,
-                             String name);
-
-    /**
-     * Combine values of this stream by key into a new instance of ever-updating {@link KTable} with default serializers and deserializers.
-     *
-     * @param reducer the instance of {@link Reducer}
-     * @param name    the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
-     */
-    KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
-
-    /**
-     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param keySerde      key serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     * @param table         the instance of {@link KTable} joined with this stream
+     * @param valueJoiner   the instance of {@link ValueJoiner}
+     * @param keySerde      key serdes for materializing this stream.
+     *                      If not specified the default serdes defined in the configs will be used
+     * @param valSerde      value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param <V1>          the value type of the table
+     * @param <V2>          the value type of the new stream
      *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                Aggregator<K, V, T> aggregator,
-                                                                Windows<W> windows,
-                                                                Serde<K> keySerde,
-                                                                Serde<T> aggValueSerde);
-
+    <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table,
+                                     ValueJoiner<V, V1, V2> valueJoiner,
+                                     Serde<K> keySerde,
+                                     Serde<V> valSerde);
     /**
-     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
+     * Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and
+     * default serializers and deserializers. If a record key is null it will not included in
+     * the resulting {@link KGroupedStream}
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param selector      select the grouping key and value to be aggregated
+     * @param <K1>          the key type of the {@link KGroupedStream}
+     * @param <V1>          the value type of the {@link KGroupedStream}
      *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @return a {@link KGroupedStream} that contains the the grouped records of the original {@link KStream}
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                Aggregator<K, V, T> aggregator,
-                                                                Windows<W> windows);
+    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector);
 
     /**
-     * Aggregate values of this stream by key into a new instance of ever-updating {@link KTable}.
+     * Group the records of this {@link KStream} using the provided {@link KeyValueMapper}.
+     * If a record key is null it will not included in the resulting {@link KGroupedStream}
      *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
-     * @param keySerde      key serdes for materializing the aggregated table,
+     * @param selector      select the grouping key and value to be aggregated
+     * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted {@link KTable}
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param <K1>          the key type of the {@link KGroupedStream}
+     * @param <V1>          the value type of the {@link KGroupedStream}
      *
-     * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
+     * @return a {@link KGroupedStream} that contains the the grouped records of the original {@link KStream}
      */
-    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                    Aggregator<K, V, T> aggregator,
-                                    Serde<K> keySerde,
-                                    Serde<T> aggValueSerde,
-                                    String name);
+    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector,
+                                            Serde<K1> keySerde,
+                                            Serde<V1> valSerde);
 
     /**
-     * Aggregate values of this stream by key into a new instance of ever-updating {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
-     * @param name          the name of the resulted {@link KTable}
-     * @param <T>           the value type of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
+     * Group the records with the same key into a {@link KGroupedStream} while preserving the
+     * original values. If a record key is null it will not included in the resulting
+     * {@link KGroupedStream}
+     * Default Serdes will be used
+     * @return a {@link KGroupedStream}
      */
-    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                    Aggregator<K, V, T> aggregator,
-                                    String name);
+    KGroupedStream<K, V> groupByKey();
 
     /**
-     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     *
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param keySerde      key serdes for materializing the counting table,
+     * Group the records with the same key into a {@link KGroupedStream} while preserving the
+     * original values. If a record key is null it will not included in the resulting
+     * {@link KGroupedStream}
+     * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
-
-    /**
-     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param windows       the specification of the aggregation {@link Windows}
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
-
-    /**
-     * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}.
-     *
-     * @param keySerde      key serdes for materializing the counting table,
+     * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
-     */
-    KTable<K, Long> countByKey(Serde<K> keySerde, String name);
-
-    /**
-     * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param name          the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     * @return a {@link KGroupedStream}
      */
-    KTable<K, Long> countByKey(String name);
+    KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
+                                    Serde<V> valSerde);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 53b2f4e..37d8921 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -89,7 +89,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
 
@@ -111,7 +111,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
new file mode 100644
index 0000000..1830484
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
+
+    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final boolean repartitionRequired;
+
+    public KGroupedStreamImpl(final KStreamBuilder topology,
+                              final String name,
+                              final Set<String> sourceNodes,
+                              final Serde<K> keySerde,
+                              final Serde<V> valSerde,
+                              final boolean repartitionRequired) {
+        super(topology, name, sourceNodes);
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> reducer,
+                               final String name) {
+        return doAggregate(
+            new KStreamReduce<K, V>(name, reducer),
+            REDUCE_NAME,
+            keyValueStore(valSerde, name));
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                            Windows<W> windows) {
+        return (KTable<Windowed<K>, V>) doAggregate(
+            new KStreamWindowReduce<K, V, W>(windows, windows.name(), reducer),
+            REDUCE_NAME,
+            windowedStore(valSerde, windows)
+        );
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<K, V, T> aggregator,
+                                      final Serde<T> aggValueSerde,
+                                      final String name) {
+        return doAggregate(
+            new KStreamAggregate<>(name, initializer, aggregator),
+            AGGREGATE_NAME,
+            keyValueStore(aggValueSerde, name));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                                  final Aggregator<K, V, T> aggregator,
+                                                                  final Windows<W> windows,
+                                                                  final Serde<T> aggValueSerde) {
+        return (KTable<Windowed<K>, T>) doAggregate(
+            new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator),
+            AGGREGATE_NAME,
+            windowedStore(aggValueSerde, windows)
+        );
+    }
+
+    @Override
+    public KTable<K, Long> count(final String name) {
+        return aggregate(new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        }, new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(K aggKey, V value, Long aggregate) {
+                return aggregate + 1;
+            }
+        }, Serdes.Long(), name);
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
+        return aggregate(new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        }, new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(K aggKey, V value, Long aggregate) {
+                return aggregate + 1;
+            }
+        }, windows, Serdes.Long());
+    }
+
+    private <T> StateStoreSupplier keyValueStore(final Serde<T> aggValueSerde, final String name) {
+        return storeFactory(aggValueSerde, name).build();
+    }
+
+
+    private <W extends Window, T> StateStoreSupplier windowedStore(final Serde<T> aggValSerde,
+                                                                   final Windows<W> windows) {
+        return storeFactory(aggValSerde, windows.name())
+            .windowed(windows.maintainMs(), windows.segments, false)
+            .build();
+
+    }
+
+    private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<T> aggValueSerde,
+                                                                    final String name) {
+        return Stores.create(name)
+            .withKeys(keySerde)
+            .withValues(aggValueSerde)
+            .persistent();
+
+    }
+
+    private <T> KTable<K, T> doAggregate(
+        final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+        final String functionName,
+        final StateStoreSupplier storeSupplier) {
+
+        final String aggFunctionName = topology.newName(functionName);
+
+        final String sourceName = repartitionIfRequired();
+
+        topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+        topology.addStateStore(storeSupplier, aggFunctionName);
+
+        return new KTableImpl<>(topology,
+                                aggFunctionName,
+                                aggregateSupplier,
+                                sourceName.equals(this.name) ? sourceNodes
+                                                             : Collections.singleton(sourceName));
+    }
+
+    /**
+     * @return the new sourceName if repartitioned. Otherwise the name of this stream
+     */
+    private String repartitionIfRequired() {
+        if (!repartitionRequired) {
+            return this.name;
+        }
+        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index f7fe4e5..7118bb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -45,8 +45,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
-
     protected final Serde<K> keySerde;
     protected final Serde<V> valSerde;
 
@@ -88,7 +86,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String funcName = topology.newName(functionName);
 
-        String topic = name + REPARTITION_TOPIC_SUFFIX;
+        String topic = name + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b6d1492..dc6410d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -61,14 +60,11 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V value) {
-            // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for KStream aggregate operator with state " + storeName + " should not be null.");
+                return;
 
             T oldAgg = store.get(key);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 91bcef9..79ff842 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -17,18 +17,16 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -36,9 +34,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -47,18 +42,17 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.lang.reflect.Array;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
 
-    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
-
     private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
 
     private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
 
-    private static final String FILTER_NAME = "KSTREAM-FILTER-";
+    public static final String FILTER_NAME = "KSTREAM-FILTER-";
 
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
@@ -84,8 +78,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
 
-    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
-
     private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
 
     public static final String SINK_NAME = "KSTREAM-SINK-";
@@ -100,8 +92,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
 
-    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
+    public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
+    private final boolean repartitionRequired;
+
+    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes,
+                       boolean repartitionRequired) {
         super(topology, name, sourceNodes);
+        this.repartitionRequired = repartitionRequired;
     }
 
     @Override
@@ -110,7 +108,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -119,20 +117,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+        return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
+    }
+
+    private <K1> String internalSelectKey(final KeyValueMapper<K, V, K1> mapper) {
         String name = topology.newName(KEY_SELECT_NAME);
         topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, KeyValue<K1, V>>() {
             @Override
             public KeyValue<K1, V> apply(K key, V value) {
-                return new KeyValue(mapper.apply(key, value), value);
+                return new KeyValue<>(mapper.apply(key, value), value);
             }
         }), this.name);
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return name;
     }
 
     @Override
@@ -141,16 +143,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
+
     @Override
     public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
         String name = topology.newName(MAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -193,7 +196,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
     @Override
@@ -202,7 +205,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -218,7 +221,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
 
-            branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes);
+            branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes, this.repartitionRequired);
         }
 
         return branchChildren;
@@ -245,7 +248,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes);
+        return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
@@ -315,7 +318,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, null, true);
     }
 
     @Override
@@ -325,7 +328,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -388,45 +391,87 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             Serde<V1> otherValueSerde,
             boolean outer) {
 
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      keySerde,
+                      thisValueSerde,
+                      otherValueSerde,
+                      new DefaultJoin(outer));
+    }
+
+    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
+                                         ValueJoiner<V, V1, R> joiner,
+                                         JoinWindows windows,
+                                         Serde<K> keySerde,
+                                         Serde<V> thisValueSerde,
+                                         Serde<V1> otherValueSerde,
+                                         KStreamImplJoin join) {
+        KStreamImpl<K, V> joinThis = this;
+        KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
+
+        if (joinThis.repartitionRequired) {
+            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde);
+        }
 
-        StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
-                .withKeys(keySerde)
-                .withValues(thisValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
+        if (joinOther.repartitionRequired) {
+            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde);
+        }
+
+        joinThis.ensureJoinableWith(joinOther);
 
-        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
-                .withKeys(keySerde)
-                .withValues(otherValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
+        return join.join(joinThis,
+                         joinOther,
+                         joiner,
+                         windows,
+                         keySerde,
+                         thisValueSerde,
+                         otherValueSerde);
+    }
 
-        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
 
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer);
-        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer);
+    /**
+     * Repartition a stream. This is required on join operations occurring after
+     * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
+     * @param keySerde      Serdes for serializing the keys
+     * @param valSerde      Serdes for serilaizing the values
+     * @return a new {@link KStreamImpl}
+     */
+    private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde,
+                                                 Serde<V> valSerde) {
 
-        KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
+        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde);
+        return new KStreamImpl<>(topology, repartitionedSourceName, Collections
+            .singleton(repartitionedSourceName), false);
+    }
 
-        String thisWindowStreamName = topology.newName(WINDOWED_NAME);
-        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-        String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
-        String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
+    static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
+                                                    Serde<K1> keySerde,
+                                                    Serde<V1> valSerde) {
+        Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
+        Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
+        Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
+        Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
 
-        topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name);
-        topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
-        topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
-        topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
-        topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+        String repartitionTopic = stream.name + REPARTITION_TOPIC_SUFFIX;
+        String sinkName = stream.topology.newName(SINK_NAME);
+        String filterName = stream.topology.newName(FILTER_NAME);
+        String sourceName = stream.topology.newName(SOURCE_NAME);
 
-        return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
+        stream.topology.addInternalTopic(repartitionTopic);
+        stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
+            @Override
+            public boolean test(final K1 key, final V1 value) {
+                return key != null;
+            }
+        }, false), stream.name);
+
+        stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
+                         valSerializer, filterName);
+        stream.topology.addSource(sourceName, keyDeserializer, valDeserializer,
+                           repartitionTopic);
+
+        return sourceName;
     }
 
     @SuppressWarnings("unchecked")
@@ -436,28 +481,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serde<K> keySerde,
+            Serde<V> thisValSerde,
             Serde<V1> otherValueSerde) {
 
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
-        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
-                .withKeys(keySerde)
-                .withValues(otherValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
-
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
-
-        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-        String joinThisName = topology.newName(LEFTJOIN_NAME);
-
-        topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
-
-        return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      keySerde,
+                      thisValSerde,
+                      otherValueSerde,
+                      new LeftJoin());
     }
 
     @Override
@@ -466,193 +499,197 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows) {
 
-        return leftJoin(other, joiner, windows, null, null);
+        return leftJoin(other, joiner, windows, null, null, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
-        String name = topology.newName(LEFTJOIN_NAME);
-
-        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
-        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
+        return leftJoin(other, joiner, null, null);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes);
     }
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                                 Windows<W> windows,
-                                                                 Serde<K> keySerde,
-                                                                 Serde<V> aggValueSerde) {
+    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other,
+                                          ValueJoiner<V, V1, R> joiner,
+                                          Serde<K> keySerde,
+                                          Serde<V> valueSerde) {
+
+        if (repartitionRequired) {
+            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
+                                                                                valueSerde
+            );
+            return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
+        } else {
+            return doStreamTableLeftJoin(other, joiner);
+        }
 
-        String reduceName = topology.newName(REDUCE_NAME);
+    }
 
-        KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer);
+    private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> other,
+                                                        final ValueJoiner<V, V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        StateStoreSupplier reduceStore = Stores.create(windows.name())
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, false)
-                .build();
+        String name = topology.newName(LEFTJOIN_NAME);
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, reduceSupplier, this.name);
-        topology.addStateStore(reduceStore, reduceName);
+        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
+        return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                                 Windows<W> windows) {
-
-        return reduceByKey(reducer, windows, null, null);
+    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector) {
+        return groupBy(selector, null, null);
     }
 
     @Override
-    public KTable<K, V> reduceByKey(Reducer<V> reducer,
-                                    Serde<K> keySerde,
-                                    Serde<V> aggValueSerde,
-                                    String name) {
-
-        String reduceName = topology.newName(REDUCE_NAME);
-
-        KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer);
+    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> valSerde) {
 
-        StateStoreSupplier reduceStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
-
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, reduceSupplier, this.name);
-        topology.addStateStore(reduceStore, reduceName);
-
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
+        String selectName = internalSelectKey(selector);
+        return new KGroupedStreamImpl<>(topology,
+                                        selectName,
+                                        sourceNodes,
+                                        keySerde,
+                                        valSerde, true);
     }
 
     @Override
-    public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) {
-
-        return reduceByKey(reducer, null, null, name);
+    public KGroupedStream<K, V> groupByKey() {
+        return groupByKey(null, null);
     }
 
     @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                       Aggregator<K, V, T> aggregator,
-                                                                       Windows<W> windows,
-                                                                       Serde<K> keySerde,
-                                                                       Serde<T> aggValueSerde) {
-
-        String aggregateName = topology.newName(AGGREGATE_NAME);
-
-        KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
-
-        StateStoreSupplier aggregateStore = Stores.create(windows.name())
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, false)
-                .build();
+    public KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
+                                           Serde<V> valSerde) {
+        return new KGroupedStreamImpl<>(topology,
+                                        this.name,
+                                        sourceNodes,
+                                        keySerde,
+                                        valSerde,
+                                        this.repartitionRequired);
+    }
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
-        topology.addStateStore(aggregateStore, aggregateName);
 
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<Windowed<K>, T, T>(topology, aggregateName, aggregateSupplier, sourceNodes);
+    private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
+                                                                     final Serde<K> keySerde,
+                                                                     final Serde<V> valueSerde,
+                                                                     final String nameSuffix) {
+        return Stores.create(windows.name() + nameSuffix)
+            .withKeys(keySerde)
+            .withValues(valueSerde)
+            .persistent()
+            .windowed(windows.maintainMs(), windows.segments, true)
+            .build();
     }
 
-    @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                       Aggregator<K, V, T> aggregator,
-                                                                       Windows<W> windows) {
+    private interface KStreamImplJoin {
 
-        return aggregateByKey(initializer, aggregator, windows, null, null);
+        <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                            KStream<K1, V2> other,
+                                            ValueJoiner<V1, V2, R> joiner,
+                                            JoinWindows windows,
+                                            Serde<K1> keySerde,
+                                            Serde<V1> lhsValueSerde,
+                                            Serde<V2> otherValueSerde);
     }
 
-    @Override
-    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                           Aggregator<K, V, T> aggregator,
-                                           Serde<K> keySerde,
-                                           Serde<T> aggValueSerde,
-                                           String name) {
+    private class DefaultJoin implements KStreamImplJoin {
 
-        String aggregateName = topology.newName(AGGREGATE_NAME);
+        private final boolean outer;
 
-        KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator);
+        DefaultJoin(final boolean outer) {
+            this.outer = outer;
+        }
 
-        StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
+        @Override
+        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                                   KStream<K1, V2> other,
+                                                   ValueJoiner<V1, V2, R> joiner,
+                                                   JoinWindows windows,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> lhsValueSerde,
+                                                   Serde<V2> otherValueSerde) {
+
+            StateStoreSupplier thisWindow =
+                createWindowedStateStore(windows, keySerde, lhsValueSerde, "-this");
+
+            StateStoreSupplier otherWindow =
+                createWindowedStateStore(windows, keySerde, otherValueSerde, "-other");
+
+
+            KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(),
+                                                                                   windows.before + windows.after + 1,
+                                                                                   windows.maintainMs());
+            KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindow .name(),
+                                                                                    windows.before + windows.after + 1,
+                                                                                    windows.maintainMs());
+
+            KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
+                                                                                  windows.before,
+                                                                                  windows.after,
+                                                                                  joiner,
+                                                                                  outer);
+            KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
+                                                                                   windows.before,
+                                                                                   windows.after,
+                                                                                   reverseJoiner(joiner),
+                                                                                   outer);
+
+            KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
+
+            String thisWindowStreamName = topology.newName(WINDOWED_NAME);
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+            String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+            String joinMergeName = topology.newName(MERGE_NAME);
+
+            topology.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
+            topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
+            topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
+            topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
+            topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+            topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
+            topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
+            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
+            return new KStreamImpl<>(topology, joinMergeName, allSourceNodes, false);
+        }
+    }
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
-        topology.addStateStore(aggregateStore, aggregateName);
 
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
-    }
+    private class LeftJoin implements KStreamImplJoin {
 
-    @Override
-    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                           Aggregator<K, V, T> aggregator,
-                                           String name) {
+        @Override
+        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                                   KStream<K1, V2> other,
+                                                   ValueJoiner<V1, V2, R> joiner,
+                                                   JoinWindows windows,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> lhsValueSerde,
+                                                   Serde<V2> otherValueSerde) {
+            StateStoreSupplier otherWindow =
+                createWindowedStateStore(windows, keySerde, otherValueSerde, "-other");
 
-        return aggregateByKey(initializer, aggregator, null, null, name);
-    }
+            KStreamJoinWindow<K1, V1>
+                otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+            KStreamKStreamJoin<K1, R, V1, V2>
+                joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                                   Serde<K> keySerde) {
-        return this.aggregateByKey(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, windows, keySerde, Serdes.Long());
-    }
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = topology.newName(LEFTJOIN_NAME);
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows) {
-        return countByKey(windows, null);
-    }
+            topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
+            topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
+            topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
 
-    @Override
-    public KTable<K, Long> countByKey(Serde<K> keySerde, String name) {
-        return this.aggregateByKey(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, keySerde, Serdes.Long(), name);
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
+            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
+            return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false);
+        }
     }
 
-    @Override
-    public KTable<K, Long> countByKey(String name) {
-        return countByKey(null, name);
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 72029a8..edde009 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,14 +60,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
             otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V1 value) {
-            // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for KStream-KStream join operator with other window state store " + otherWindowName + " should not be null.");
+                return;
 
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index ed6e216..dd5ba45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -58,14 +57,12 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V value) {
-            // the keys should never be null
+            // If the key is null we don't need to proceed
             if (key == null)
-                throw new StreamsException("Record key for KStream reduce operator with state " + storeName + " should not be null.");
+                return;
 
             V oldAgg = store.get(key);
             V newAgg = oldAgg;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index a526506..46d99a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -68,7 +68,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
         @Override
         public void process(K key, V value) {
-            // if the key is null, we do not need proceed aggregating the record
+            // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (key == null)
                 return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 51d4cb4..c5543ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -229,7 +229,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             }
         }), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, false);
     }
 
     @Override