You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/29 23:03:37 UTC

[2/2] kafka git commit: KAFKA-3192: Add unwindowed aggregations for KStream; and make all example code executable

KAFKA-3192: Add unwindowed aggregations for KStream; and make all example code executable

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda, Michael G. Noll, Jun Rao

Closes #870 from guozhangwang/K3192


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/845c6eae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/845c6eae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/845c6eae

Branch: refs/heads/trunk
Commit: 845c6eae1f6c6bcf117f5baa53bb19b4611c0528
Parents: a731297
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 29 14:03:32 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 29 14:03:32 2016 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |   1 +
 .../scala/kafka/tools/ConsoleConsumer.scala     |  23 +-
 .../examples/pageview/JsonPOJODeserializer.java |   5 -
 .../pageview/JsonTimestampExtractor.java        |  46 +++
 .../examples/pageview/PageViewTypedJob.java     |  88 +++++-
 .../examples/pageview/PageViewUnTypedJob.java   |  87 ++++--
 .../kafka/streams/examples/pipe/PipeJob.java    |  18 +-
 .../examples/wordcount/WordCountJob.java        |  48 ++-
 .../wordcount/WordCountProcessorJob.java        |  23 +-
 .../apache/kafka/streams/kstream/KStream.java   |  39 +++
 .../internals/KStreamAggProcessorSupplier.java  |  28 ++
 .../kstream/internals/KStreamAggWindow.java     |  51 ----
 .../kstream/internals/KStreamAggregate.java     | 119 ++------
 .../streams/kstream/internals/KStreamImpl.java  | 100 +++++--
 .../kstream/internals/KStreamReduce.java        | 124 +++-----
 .../internals/KStreamWindowAggregate.java       | 171 +++++++++++
 .../kstream/internals/KStreamWindowReduce.java  | 165 +++++++++++
 .../streams/kstream/internals/KTableImpl.java   |  10 +-
 .../internals/KTableProcessorSupplier.java      |   1 -
 .../streams/processor/TopologyBuilder.java      |  26 ++
 .../processor/internals/RecordQueue.java        |   8 +
 .../internals/StreamPartitionAssignor.java      |   2 +-
 .../processor/internals/StreamThread.java       |   1 -
 .../kstream/internals/KStreamAggregateTest.java | 294 -------------------
 .../internals/KStreamWindowAggregateTest.java   | 294 +++++++++++++++++++
 .../kstream/internals/KTableAggregateTest.java  |   1 +
 .../processor/internals/PartitionGroupTest.java |   4 +-
 .../processor/internals/RecordQueueTest.java    |   4 +-
 .../processor/internals/StreamTaskTest.java     |  19 +-
 29 files changed, 1168 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8e93973..9d24d07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -343,6 +343,7 @@ public class Sender implements Runnable {
                 handleProduceResponse(response, recordsByPartition, time.milliseconds());
             }
         };
+
         return new ClientRequest(now, acks != 0, send, callback);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0ae057f..0d85aca 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -29,6 +29,7 @@ import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.serialization.{Deserializer, ByteArrayDeserializer}
 import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
 
@@ -349,7 +350,12 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keySeparator = "\t".getBytes
   var lineSeparator = "\n".getBytes
 
+  var keyDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer()
+  var valDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer()
+
   override def init(props: Properties) {
+    System.out.println(props)
+
     if (props.containsKey("print.timestamp"))
       printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
     if (props.containsKey("print.key"))
@@ -358,6 +364,19 @@ class DefaultMessageFormatter extends MessageFormatter {
       keySeparator = props.getProperty("key.separator").getBytes
     if (props.containsKey("line.separator"))
       lineSeparator = props.getProperty("line.separator").getBytes
+
+    if (props.containsKey("key.decoder")) {
+      keyDecoder = Class.forName(props.getProperty("key.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]]
+
+      System.out.println("update key decoder")
+    }
+    if (props.containsKey("value.decoder")) {
+      valDecoder = Class.forName(props.getProperty("value.decoder")).newInstance().asInstanceOf[Deserializer[_ <: Object]]
+
+      System.out.println("update value decoder")
+    }
+    System.out.println(keyDecoder)
+    System.out.println(valDecoder)
   }
 
   def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType, output: PrintStream) {
@@ -369,10 +388,10 @@ class DefaultMessageFormatter extends MessageFormatter {
       output.write(keySeparator)
     }
     if (printKey) {
-      output.write(if (key == null) "null".getBytes else key)
+      output.write(if (key == null) "null".getBytes else keyDecoder.deserialize(null, key).toString.getBytes)
       output.write(keySeparator)
     }
-    output.write(if (value == null) "null".getBytes else value)
+    output.write(if (value == null) "null".getBytes else valDecoder.deserialize(null, value).toString.getBytes)
     output.write(lineSeparator)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
index 583ec2d..5fcd1f3 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
@@ -16,17 +16,12 @@
  **/
 package org.apache.kafka.streams.examples.pageview;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.Map;
 
-/**
- * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
- * structured data without having associated Java classes. This deserializer also supports Connect schemas.
- */
 public class JsonPOJODeserializer<T> implements Deserializer<T> {
     private ObjectMapper objectMapper = new ObjectMapper();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
new file mode 100644
index 0000000..6443193
--- /dev/null
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples.pageview;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+/**
+ * A timestamp extractor implementation that tries to extract event time from
+ * the "timestamp" field in the Json formatted message.
+ */
+public class JsonTimestampExtractor implements TimestampExtractor {
+
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        if (record.value() instanceof PageViewTypedJob.PageView) {
+            return ((PageViewTypedJob.PageView) record.value()).timestamp;
+        }
+
+        if (record.value() instanceof PageViewTypedJob.UserProfile) {
+            return ((PageViewTypedJob.UserProfile) record.value()).timestamp;
+        }
+
+        if (record.value() instanceof JsonNode) {
+            return ((JsonNode) record.value()).get("timestamp").longValue();
+        }
+
+        throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index 358cbe8..3f9b283 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.examples.pageview;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -30,22 +31,38 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.StreamsConfig;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
 public class PageViewTypedJob {
 
     // POJO classes
     static public class PageView {
         public String user;
         public String page;
+        public Long timestamp;
     }
 
     static public class UserProfile {
-        public String user;
         public String region;
+        public Long timestamp;
     }
 
     static public class PageViewByRegion {
@@ -66,13 +83,17 @@ public class PageViewTypedJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -81,26 +102,59 @@ public class PageViewTypedJob {
         final Serializer<Long> longSerializer = new LongSerializer();
         final Deserializer<Long> longDeserializer = new LongDeserializer();
 
+        // TODO: the following can be removed with a serialization factory
+        Map<String, Object> serdeProps = new HashMap<>();
+
+        final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageView.class);
+        pageViewDeserializer.configure(serdeProps, false);
+
+        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileDeserializer.configure(serdeProps, false);
 
-        KStream<String, PageView> views = builder.stream("streams-pageview-input");
+        final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileSerializer.configure(serdeProps, false);
 
-        KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record));
+        final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+        wPageViewByRegionSerializer.configure(serdeProps, false);
 
-        KTable<String, UserProfile> users = builder.table("streams-userprofile-input");
+        final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", RegionCount.class);
+        regionCountSerializer.configure(serdeProps, false);
 
-        KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser
-                .leftJoin(users, (view, profile) -> {
-                    PageViewByRegion viewByRegion = new PageViewByRegion();
-                    viewByRegion.user = view.user;
-                    viewByRegion.page = view.page;
-                    viewByRegion.region = profile.region;
+        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
 
-                    return viewByRegion;
+        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+
+        KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
+                .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
+                    @Override
+                    public PageViewByRegion apply(PageView view, UserProfile profile) {
+                        PageViewByRegion viewByRegion = new PageViewByRegion();
+                        viewByRegion.user = view.user;
+                        viewByRegion.page = view.page;
+
+                        if (profile != null) {
+                            viewByRegion.region = profile.region;
+                        } else {
+                            viewByRegion.region = "UNKNOWN";
+                        }
+                        return viewByRegion;
+                    }
+                })
+                .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
+                    @Override
+                    public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
+                        return new KeyValue<>(viewRegion.region, viewRegion);
+                    }
                 })
-                .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
                 .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
                         stringSerializer, longSerializer,
                         stringDeserializer, longDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
                     @Override
@@ -118,9 +172,15 @@ public class PageViewTypedJob {
                 });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
+        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 2fdfa97..065f5f5 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.examples.pageview;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -31,27 +32,44 @@ import org.apache.kafka.connect.json.JsonDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.Windowed;
 
 import java.util.Properties;
 
-public class PageViewUnTypedJob {
+/**
+ * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
+ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
+ * in Kafka Streams.
+ *
+ * In this example, we join a stream of pageviews (aka clickstreams) that reads from  a topic named "streams-pageview-input"
+ * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
+ * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
+public class PageViewUntypedJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
+        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -59,47 +77,66 @@ public class PageViewUnTypedJob {
         final Deserializer<String> stringDeserializer = new StringDeserializer();
         final Serializer<Long> longSerializer = new LongSerializer();
         final Deserializer<Long> longDeserializer = new LongDeserializer();
-
-
-        KStream<String, JsonNode> views = builder.stream("streams-pageview-input");
-
-        KStream<String, JsonNode> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record));
-
-        KTable<String, JsonNode> users = builder.table("streams-userprofile-input");
-
-        KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
-
-        KStream<JsonNode, JsonNode> regionCount = viewsByUser
-                .leftJoin(userRegions, (view, region) -> {
-                    ObjectNode jNode = JsonNodeFactory.instance.objectNode();
-
-                    return (JsonNode) jNode.put("user", view.get("user").textValue())
-                            .put("page", view.get("page").textValue())
-                            .put("region", region);
+        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
+        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+
+        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+
+        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+
+        KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
+            @Override
+            public String apply(JsonNode record) {
+                return record.get("region").textValue();
+            }
+        });
+
+        KStream<JsonNode, JsonNode> regionCount = views
+                .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
+                            @Override
+                            public JsonNode apply(JsonNode view, String region) {
+                                ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+
+                                return jNode.put("user", view.get("user").textValue())
+                                        .put("page", view.get("page").textValue())
+                                        .put("region", region == null ? "UNKNOWN" : region);
+                            }
+                        })
+                .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
+                    @Override
+                    public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
+                        return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
+                    }
                 })
-                .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
                 .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
                         stringSerializer, longSerializer,
                         stringDeserializer, longDeserializer)
+                // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
                     @Override
                     public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
                         ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
                         keyNode.put("window-start", key.window().start())
-                                .put("region", key.window().start());
+                                .put("region", key.value());
 
                         ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
-                        keyNode.put("count", value);
+                        valueNode.put("count", value);
 
-                        return new KeyValue<JsonNode, JsonNode>((JsonNode) keyNode, (JsonNode) valueNode);
+                        return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
                     }
                 });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-output");
+        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
index 841f37b..9e737ba 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -26,6 +26,16 @@ import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Properties;
 
+/**
+ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
+ * write data to a sink (output) topic.
+ *
+ * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
+ * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
 public class PipeJob {
 
     public static void main(String[] args) throws Exception {
@@ -37,7 +47,7 @@ public class PipeJob {
         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
-        // can specify underlying client configs if necessary
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
@@ -46,5 +56,11 @@ public class PipeJob {
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index b922695..da6b095 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -17,9 +17,6 @@
 
 package org.apache.kafka.streams.examples.wordcount;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -27,21 +24,29 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.connect.json.JsonSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.UnlimitedWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
 
 import java.util.Arrays;
 import java.util.Properties;
 
+/**
+ * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
 public class WordCountJob {
 
     public static void main(String[] args) throws Exception {
@@ -54,7 +59,7 @@ public class WordCountJob {
         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
-        // can specify underlying client configs if necessary
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
@@ -63,11 +68,10 @@ public class WordCountJob {
         final Deserializer<String> stringDeserializer = new StringDeserializer();
         final Serializer<Long> longSerializer = new LongSerializer();
         final Deserializer<Long> longDeserializer = new LongDeserializer();
-        final Serializer<JsonNode> JsonSerializer = new JsonSerializer();
 
         KStream<String, String> source = builder.stream("streams-file-input");
 
-        KStream<String, JsonNode> counts = source
+        KTable<String, Long> counts = source
                 .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                     @Override
                     public Iterable<String> apply(String value) {
@@ -79,25 +83,17 @@ public class WordCountJob {
                         return new KeyValue<String, String>(value, value);
                     }
                 })
-                .countByKey(UnlimitedWindows.of("Counts").startOn(0L),
-                        stringSerializer, longSerializer,
-                        stringDeserializer, longDeserializer)
-                .toStream()
-                .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<String, JsonNode>>() {
-                    @Override
-                    public KeyValue<String, JsonNode> apply(Windowed<String> key, Long value) {
-                        ObjectNode jNode = JsonNodeFactory.instance.objectNode();
+                .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");
 
-                        jNode.put("word", key.value())
-                             .put("count", value);
-
-                        return new KeyValue<String, JsonNode>(null, jNode);
-                    }
-                });
-
-        counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
+        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
index 63692bd..61e8335 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java
@@ -33,6 +33,17 @@ import org.apache.kafka.streams.state.Stores;
 
 import java.util.Properties;
 
+/**
+ * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
+ * that computes a simple word occurrence histogram from an input text.
+ *
+ * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
+ * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
+ * is an updated count of a single word.
+ *
+ * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...)
+ * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
+ */
 public class WordCountProcessorJob {
 
     private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
@@ -72,7 +83,7 @@ public class WordCountProcessorJob {
                 public void punctuate(long timestamp) {
                     KeyValueIterator<String, Integer> iter = this.kvStore.all();
 
-                    System.out.println("----------- " + timestamp + "----------- ");
+                    System.out.println("----------- " + timestamp + " ----------- ");
 
                     while (iter.hasNext()) {
                         KeyValue<String, Integer> entry = iter.next();
@@ -103,7 +114,7 @@ public class WordCountProcessorJob {
         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
-        // can specify underlying client configs if necessary
+        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -113,9 +124,15 @@ public class WordCountProcessorJob {
         builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
         builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
 
-        builder.addSink("Sink", "streams-wordcount-output", "Process");
+        builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
+
+        // usually the streaming job would be ever running,
+        // in this example we just let it run for some time and stop since the input data is finite.
+        Thread.sleep(5000L);
+
+        streams.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index b83298f..231eb22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -283,6 +283,18 @@ public interface KStream<K, V> {
     /**
      * Aggregate values of this stream by key on a window basis.
      *
+     * @param reducer the class of Reducer
+     */
+    KTable<K, V> reduceByKey(Reducer<V> reducer,
+                             Serializer<K> keySerializer,
+                             Serializer<V> aggValueSerializer,
+                             Deserializer<K> keyDeserializer,
+                             Deserializer<V> aggValueDeserializer,
+                             String name);
+
+    /**
+     * Aggregate values of this stream by key on a window basis.
+     *
      * @param initializer the class of Initializer
      * @param aggregator the class of Aggregator
      * @param windows the specification of the aggregation window
@@ -297,6 +309,22 @@ public interface KStream<K, V> {
                                                                 Deserializer<T> aggValueDeserializer);
 
     /**
+     * Aggregate values of this stream by key without a window basis, and hence
+     * return an ever updating table
+     *
+     * @param initializer the class of Initializer
+     * @param aggregator the class of Aggregator
+     * @param <T>   the value type of the aggregated table
+     */
+    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
+                                    Aggregator<K, V, T> aggregator,
+                                    Serializer<K> keySerializer,
+                                    Serializer<T> aggValueSerializer,
+                                    Deserializer<K> keyDeserializer,
+                                    Deserializer<T> aggValueDeserializer,
+                                    String name);
+
+    /**
      * Count number of messages of this stream by key on a window basis.
      *
      * @param windows the specification of the aggregation window
@@ -306,4 +334,15 @@ public interface KStream<K, V> {
                                                             Serializer<Long> aggValueSerializer,
                                                             Deserializer<K> keyDeserializer,
                                                             Deserializer<Long> aggValueDeserializer);
+
+    /**
+     * Count number of messages of this stream by key without a window basis, and hence
+     * return a ever updating counting table
+     *
+     */
+    KTable<K, Long> countByKey(Serializer<K> keySerializer,
+                               Serializer<Long> aggValueSerializer,
+                               Deserializer<K> keyDeserializer,
+                               Deserializer<Long> aggValueDeserializer,
+                               String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
new file mode 100644
index 0000000..deb98ed
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggProcessorSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public interface KStreamAggProcessorSupplier<K, RK, V, T> extends ProcessorSupplier<K, V> {
+
+    KTableValueGetterSupplier<RK, T> view();
+
+    void enableSendingOldValues();
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
deleted file mode 100644
index f02f53a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamAggWindow<K, V> implements ProcessorSupplier<K, V> {
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamAggWindowProcessor();
-    }
-
-    private class KStreamAggWindowProcessor extends AbstractProcessor<K, V> {
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-        }
-
-        @Override
-        public void process(K key, V value) {
-            // create a dummy window just for wrapping the timestamp
-            long timestamp = context().timestamp();
-
-            // send the new aggregate value
-            context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b64277c..f41bfa6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -18,38 +18,28 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 
-import java.util.Iterator;
-import java.util.Map;
-
-public class KStreamAggregate<K, V, T, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, T> {
+public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
 
     private final String storeName;
-    private final Windows<W> windows;
     private final Initializer<T> initializer;
     private final Aggregator<K, V, T> aggregator;
 
     private boolean sendOldValues = false;
 
-    public KStreamAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
-        this.windows = windows;
+    public KStreamAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.aggregator = aggregator;
     }
 
     @Override
-    public Processor<Windowed<K>, Change<V>> get() {
+    public Processor<K, V> get() {
         return new KStreamAggregateProcessor();
     }
 
@@ -58,117 +48,68 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
         sendOldValues = true;
     }
 
-    private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+    private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
 
-        private WindowStore<K, T> windowStore;
+        private KeyValueStore<K, T> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
 
-            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
         @Override
-        public void process(Windowed<K> windowedKey, Change<V> change) {
-            // first get the matching windows
-            long timestamp = windowedKey.window().start();
-            K key = windowedKey.value();
-            V value = change.newValue;
-
-            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
-            long timeFrom = Long.MAX_VALUE;
-            long timeTo = Long.MIN_VALUE;
-
-            // use range query on window store for efficient reads
-            for (long windowStartMs : matchedWindows.keySet()) {
-                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
-                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
-            }
-
-            WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
-
-            // for each matching window, try to update the corresponding key and send to the downstream
-            while (iter.hasNext()) {
-                KeyValue<Long, T> entry = iter.next();
-                W window = matchedWindows.get(entry.key);
-
-                if (window != null) {
-
-                    T oldAgg = entry.value;
+        public void process(K key, V value) {
+            T oldAgg = store.get(key);
 
-                    if (oldAgg == null)
-                        oldAgg = initializer.apply();
+            if (oldAgg == null)
+                oldAgg = initializer.apply();
 
-                    // try to add the new new value (there will never be old value)
-                    T newAgg = aggregator.apply(key, value, oldAgg);
+            T newAgg = oldAgg;
 
-                    // update the store with the new value
-                    windowStore.put(key, newAgg, window.start());
-
-                    // forward the aggregated change pair
-                    if (sendOldValues)
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
-                    else
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
-
-                    matchedWindows.remove(entry.key);
-                }
+            // try to add the new new value
+            if (value != null) {
+                newAgg = aggregator.apply(key, value, newAgg);
             }
 
-            iter.close();
-
-            // create the new window for the rest of unmatched window that do not exist yet
-            for (long windowStartMs : matchedWindows.keySet()) {
-                T oldAgg = initializer.apply();
-                T newAgg = aggregator.apply(key, value, oldAgg);
+            // update the store with the new value
+            store.put(key, newAgg);
 
-                windowStore.put(key, newAgg, windowStartMs);
-
-                // send the new aggregate pair
-                if (sendOldValues)
-                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
-                else
-                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
-            }
+            // send the old / new pair
+            if (sendOldValues)
+                context().forward(key, new Change<>(newAgg, oldAgg));
+            else
+                context().forward(key, new Change<>(newAgg, null));
         }
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, T> view() {
+    public KTableValueGetterSupplier<K, T> view() {
 
-        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+        return new KTableValueGetterSupplier<K, T>() {
 
-            public KTableValueGetter<Windowed<K>, T> get() {
+            public KTableValueGetter<K, T> get() {
                 return new KStreamAggregateValueGetter();
             }
 
         };
     }
 
-    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+    private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
 
-        private WindowStore<K, T> windowStore;
+        private KeyValueStore<K, T> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
-            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
-        @SuppressWarnings("unchecked")
         @Override
-        public T get(Windowed<K> windowedKey) {
-            K key = windowedKey.value();
-            W window = (W) windowedKey.window();
-
-            // this iterator should contain at most one element
-            Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
-
-            return iter.hasNext() ? iter.next().value : null;
+        public T get(K key) {
+            return store.get(key);
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79a3115..9f384ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -385,6 +385,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String name = topology.newName(LEFTJOIN_NAME);
 
         topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
         return new KStreamImpl<>(topology, name, allSourceNodes);
     }
@@ -397,15 +398,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                  Deserializer<K> keyDeserializer,
                                                                  Deserializer<V> aggValueDeserializer) {
 
-        // TODO: this agg window operator is only used for casting K to Windowed<K> for
-        // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
         String reduceName = topology.newName(REDUCE_NAME);
-        String selectName = topology.newName(SELECT_NAME);
 
-        ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
-        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
+        KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer);
 
-        StateStoreSupplier aggregateStore = Stores.create(windows.name())
+        StateStoreSupplier reduceStore = Stores.create(windows.name())
                 .withKeys(keySerializer, keyDeserializer)
                 .withValues(aggValueSerializer, aggValueDeserializer)
                 .persistent()
@@ -413,12 +410,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                 .build();
 
         // aggregate the values with the aggregator and local store
-        topology.addProcessor(selectName, aggWindowSupplier, this.name);
-        topology.addProcessor(reduceName, aggregateSupplier, selectName);
-        topology.addStateStore(aggregateStore, reduceName);
+        topology.addProcessor(reduceName, reduceSupplier, this.name);
+        topology.addStateStore(reduceStore, reduceName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
+    }
+
+    @Override
+    public KTable<K, V> reduceByKey(Reducer<V> reducer,
+                                    Serializer<K> keySerializer,
+                                    Serializer<V> aggValueSerializer,
+                                    Deserializer<K> keyDeserializer,
+                                    Deserializer<V> aggValueDeserializer,
+                                    String name) {
+
+        String reduceName = topology.newName(REDUCE_NAME);
+
+        KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer);
+
+        StateStoreSupplier reduceStore = Stores.create(name)
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(aggValueSerializer, aggValueDeserializer)
+                .persistent()
+                .build();
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(reduceName, reduceSupplier, this.name);
+        topology.addStateStore(reduceStore, reduceName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes);
+        return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
     }
 
     @Override
@@ -430,13 +452,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                        Deserializer<K> keyDeserializer,
                                                                        Deserializer<T> aggValueDeserializer) {
 
-        // TODO: this agg window operator is only used for casting K to Windowed<K> for
-        // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
         String aggregateName = topology.newName(AGGREGATE_NAME);
-        String selectName = topology.newName(SELECT_NAME);
 
-        ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
-        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), initializer, aggregator);
+        KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
 
         StateStoreSupplier aggregateStore = Stores.create(windows.name())
                 .withKeys(keySerializer, keyDeserializer)
@@ -446,8 +464,34 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                 .build();
 
         // aggregate the values with the aggregator and local store
-        topology.addProcessor(selectName, aggWindowSupplier, this.name);
-        topology.addProcessor(aggregateName, aggregateSupplier, selectName);
+        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
+        topology.addStateStore(aggregateStore, aggregateName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<Windowed<K>, T, T>(topology, aggregateName, aggregateSupplier, sourceNodes);
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
+                                           Aggregator<K, V, T> aggregator,
+                                           Serializer<K> keySerializer,
+                                           Serializer<T> aggValueSerializer,
+                                           Deserializer<K> keyDeserializer,
+                                           Deserializer<T> aggValueDeserializer,
+                                           String name) {
+
+        String aggregateName = topology.newName(AGGREGATE_NAME);
+
+        KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator);
+
+        StateStoreSupplier aggregateStore = Stores.create(name)
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(aggValueSerializer, aggValueDeserializer)
+                .persistent()
+                .build();
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
         topology.addStateStore(aggregateStore, aggregateName);
 
         // return the KTable representation with the intermediate topic as the sources
@@ -474,4 +518,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                     }
                 }, windows, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer);
     }
+
+    @Override
+    public     KTable<K, Long> countByKey(Serializer<K> keySerializer,
+                                          Serializer<Long> aggValueSerializer,
+                                          Deserializer<K> keyDeserializer,
+                                          Deserializer<Long> aggValueDeserializer,
+                                          String name) {
+
+        return this.aggregateByKey(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                },
+                new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate + 1L;
+                    }
+                }, keySerializer, aggValueSerializer, keyDeserializer, aggValueDeserializer, name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index c484c7b..0ec0465 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,37 +17,27 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 
-import java.util.Iterator;
-import java.util.Map;
-
-public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, V> {
+public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
 
     private final String storeName;
-    private final Windows<W> windows;
     private final Reducer<V> reducer;
 
     private boolean sendOldValues = false;
 
-    public KStreamReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
-        this.windows = windows;
+    public KStreamReduce(String storeName, Reducer<V> reducer) {
         this.storeName = storeName;
         this.reducer = reducer;
     }
 
     @Override
-    public Processor<Windowed<K>, Change<V>> get() {
-        return new KStreamAggregateProcessor();
+    public Processor<K, V> get() {
+        return new KStreamReduceProcessor();
     }
 
     @Override
@@ -55,113 +45,69 @@ public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSup
         sendOldValues = true;
     }
 
-    private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+    private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
 
-        private WindowStore<K, V> windowStore;
+        private KeyValueStore<K, V> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
 
-            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
         @Override
-        public void process(Windowed<K> windowedKey, Change<V> change) {
-            // first get the matching windows
-            long timestamp = windowedKey.window().start();
-            K key = windowedKey.value();
-            V value = change.newValue;
-
-            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
-            long timeFrom = Long.MAX_VALUE;
-            long timeTo = Long.MIN_VALUE;
-
-            // use range query on window store for efficient reads
-            for (long windowStartMs : matchedWindows.keySet()) {
-                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
-                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
-            }
-
-            WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
-
-            // for each matching window, try to update the corresponding key and send to the downstream
-            while (iter.hasNext()) {
-                KeyValue<Long, V> entry = iter.next();
-                W window = matchedWindows.get(entry.key);
-
-                if (window != null) {
-
-                    V oldAgg = entry.value;
-                    V newAgg = oldAgg;
-
-                    // try to add the new new value (there will never be old value)
-                    if (newAgg == null) {
-                        newAgg = value;
-                    } else {
-                        newAgg = reducer.apply(newAgg, value);
-                    }
-
-                    // update the store with the new value
-                    windowStore.put(key, newAgg, window.start());
-
-                    // forward the aggregated change pair
-                    if (sendOldValues)
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
-                    else
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
-
-                    matchedWindows.remove(entry.key);
+        public void process(K key, V value) {
+            V oldAgg = store.get(key);
+            V newAgg = oldAgg;
+
+            // try to add the new new value
+            if (value != null) {
+                if (newAgg == null) {
+                    newAgg = value;
+                } else {
+                    newAgg = reducer.apply(newAgg, value);
                 }
             }
 
-            iter.close();
+            // update the store with the new value
+            store.put(key, newAgg);
 
-            // create the new window for the rest of unmatched window that do not exist yet
-            for (long windowStartMs : matchedWindows.keySet()) {
-                windowStore.put(key, value, windowStartMs);
-
-                // send the new aggregate pair (there will be no old value)
-                context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
-            }
+            // send the old / new pair
+            if (sendOldValues)
+                context().forward(key, new Change<>(newAgg, oldAgg));
+            else
+                context().forward(key, new Change<>(newAgg, null));
         }
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, V> view() {
+    public KTableValueGetterSupplier<K, V> view() {
 
-        return new KTableValueGetterSupplier<Windowed<K>, V>() {
+        return new KTableValueGetterSupplier<K, V>() {
 
-            public KTableValueGetter<Windowed<K>, V> get() {
-                return new KStreamAggregateValueGetter();
+            public KTableValueGetter<K, V> get() {
+                return new KStreamReduceValueGetter();
             }
 
         };
     }
 
-    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+    private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
 
-        private WindowStore<K, V> windowStore;
+        private KeyValueStore<K, V> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
-            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
-        @SuppressWarnings("unchecked")
         @Override
-        public V get(Windowed<K> windowedKey) {
-            K key = windowedKey.value();
-            W window = (W) windowedKey.window();
-
-            // this iterator should only contain one element
-            Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
-
-            return iter.next().value;
+        public V get(K key) {
+            return store.get(key);
         }
-
     }
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
new file mode 100644
index 0000000..76964f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+
+    private final String storeName;
+    private final Windows<W> windows;
+    private final Initializer<T> initializer;
+    private final Aggregator<K, V, T> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamWindowAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamWindowAggregateProcessor();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
+
+        private WindowStore<K, T> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            // first get the matching windows
+            long timestamp = context().timestamp();
+            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+            long timeFrom = Long.MAX_VALUE;
+            long timeTo = Long.MIN_VALUE;
+
+            // use range query on window store for efficient reads
+            for (long windowStartMs : matchedWindows.keySet()) {
+                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+            }
+
+            WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+            // for each matching window, try to update the corresponding key and send to the downstream
+            while (iter.hasNext()) {
+                KeyValue<Long, T> entry = iter.next();
+                W window = matchedWindows.get(entry.key);
+
+                if (window != null) {
+
+                    T oldAgg = entry.value;
+
+                    if (oldAgg == null)
+                        oldAgg = initializer.apply();
+
+                    // try to add the new new value (there will never be old value)
+                    T newAgg = aggregator.apply(key, value, oldAgg);
+
+                    // update the store with the new value
+                    windowStore.put(key, newAgg, window.start());
+
+                    // forward the aggregated change pair
+                    if (sendOldValues)
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                    else
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+                    matchedWindows.remove(entry.key);
+                }
+            }
+
+            iter.close();
+
+            // create the new window for the rest of unmatched window that do not exist yet
+            for (long windowStartMs : matchedWindows.keySet()) {
+                T oldAgg = initializer.apply();
+                T newAgg = aggregator.apply(key, value, oldAgg);
+
+                windowStore.put(key, newAgg, windowStartMs);
+
+                // send the new aggregate pair
+                if (sendOldValues)
+                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
+                else
+                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
+            }
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, T> view() {
+
+        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+
+            public KTableValueGetter<Windowed<K>, T> get() {
+                return new KStreamWindowAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+
+        private WindowStore<K, T> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T get(Windowed<K> windowedKey) {
+            K key = windowedKey.value();
+            W window = (W) windowedKey.window();
+
+            // this iterator should contain at most one element
+            Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
+
+            return iter.hasNext() ? iter.next().value : null;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
new file mode 100644
index 0000000..d532e79
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
+
+    private final String storeName;
+    private final Windows<W> windows;
+    private final Reducer<V> reducer;
+
+    private boolean sendOldValues = false;
+
+    public KStreamWindowReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.reducer = reducer;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamWindowReduceProcessor();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamWindowReduceProcessor extends AbstractProcessor<K, V> {
+
+        private WindowStore<K, V> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            // first get the matching windows
+            long timestamp = context().timestamp();
+
+            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+            long timeFrom = Long.MAX_VALUE;
+            long timeTo = Long.MIN_VALUE;
+
+            // use range query on window store for efficient reads
+            for (long windowStartMs : matchedWindows.keySet()) {
+                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+            }
+
+            WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+            // for each matching window, try to update the corresponding key and send to the downstream
+            while (iter.hasNext()) {
+                KeyValue<Long, V> entry = iter.next();
+                W window = matchedWindows.get(entry.key);
+
+                if (window != null) {
+
+                    V oldAgg = entry.value;
+                    V newAgg = oldAgg;
+
+                    // try to add the new new value (there will never be old value)
+                    if (newAgg == null) {
+                        newAgg = value;
+                    } else {
+                        newAgg = reducer.apply(newAgg, value);
+                    }
+
+                    // update the store with the new value
+                    windowStore.put(key, newAgg, window.start());
+
+                    // forward the aggregated change pair
+                    if (sendOldValues)
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                    else
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+                    matchedWindows.remove(entry.key);
+                }
+            }
+
+            iter.close();
+
+            // create the new window for the rest of unmatched window that do not exist yet
+            for (long windowStartMs : matchedWindows.keySet()) {
+                windowStore.put(key, value, windowStartMs);
+
+                // send the new aggregate pair (there will be no old value)
+                context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
+            }
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, V> view() {
+
+        return new KTableValueGetterSupplier<Windowed<K>, V>() {
+
+            public KTableValueGetter<Windowed<K>, V> get() {
+                return new KStreamAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+
+        private WindowStore<K, V> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public V get(Windowed<K> windowedKey) {
+            K key = windowedKey.value();
+            W window = (W) windowedKey.window();
+
+            // this iterator should only contain one element
+            Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
+
+            return iter.next().value;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index fa4cd93..b82582b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -76,7 +76,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
 
-    public final ProcessorSupplier<K, ?> processorSupplier;
+    public final ProcessorSupplier<?, ?> processorSupplier;
 
     private final Serializer<K> keySerializer;
     private final Serializer<V> valSerializer;
@@ -87,14 +87,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     public KTableImpl(KStreamBuilder topology,
                       String name,
-                      ProcessorSupplier<K, ?> processorSupplier,
+                      ProcessorSupplier<?, ?> processorSupplier,
                       Set<String> sourceNodes) {
         this(topology, name, processorSupplier, sourceNodes, null, null, null, null);
     }
 
     public KTableImpl(KStreamBuilder topology,
                       String name,
-                      ProcessorSupplier<K, ?> processorSupplier,
+                      ProcessorSupplier<?, ?> processorSupplier,
                       Set<String> sourceNodes,
                       Serializer<K> keySerializer,
                       Serializer<V> valSerializer,
@@ -389,6 +389,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
             materialize(source);
             return new KTableSourceValueGetterSupplier<>(source.topic);
+        } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
+            return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
         } else {
             return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
         }
@@ -401,6 +403,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                 KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
                 materialize(source);
                 source.enableSendingOldValues();
+            } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
+                ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
             } else {
                 ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/845c6eae/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index d647b72..df03280 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -24,5 +24,4 @@ public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, C
     KTableValueGetterSupplier<K, T> view();
 
     void enableSendingOldValues();
-
 }