You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/08 02:18:40 UTC

[2/2] kafka git commit: KAFKA-2653: Add KStream/KTable Aggregation and KTable Join APIs

KAFKA-2653: Add KStream/KTable Aggregation and KTable Join APIs

ping ymatsuda for reviews.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #730 from guozhangwang/K2653r


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40d731b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40d731b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40d731b8

Branch: refs/heads/trunk
Commit: 40d731b8712950122915795acca43886851a73b6
Parents: 4836e52
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jan 7 17:18:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 7 17:18:33 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/examples/KStreamJob.java      |   2 +-
 .../kafka/streams/examples/KTableJob.java       | 111 +++++++++++++++++++
 .../kafka/streams/kstream/Aggregator.java       |  42 +++++++
 .../streams/kstream/AggregatorSupplier.java     |  23 ++++
 .../kafka/streams/kstream/HoppingWindows.java   |  79 +++++++++++++
 .../kafka/streams/kstream/JoinWindowSpec.java   |  91 ---------------
 .../kafka/streams/kstream/JoinWindows.java      | 110 ++++++++++++++++++
 .../apache/kafka/streams/kstream/KStream.java   |  89 +++++++++++++--
 .../kafka/streams/kstream/KStreamBuilder.java   |   6 +-
 .../kafka/streams/kstream/KStreamWindowed.java  |  38 -------
 .../apache/kafka/streams/kstream/KTable.java    |  90 ++++++++++++++-
 .../streams/kstream/KeyValueToDoubleMapper.java |  23 ++++
 .../streams/kstream/KeyValueToIntMapper.java    |  23 ++++
 .../streams/kstream/KeyValueToLongMapper.java   |  23 ++++
 .../kafka/streams/kstream/SlidingWindows.java   |  67 +++++++++++
 .../streams/kstream/TransformerSupplier.java    |   1 -
 .../kafka/streams/kstream/UnlimitedWindows.java |  63 +++++++++++
 .../apache/kafka/streams/kstream/Window.java    |  51 +++++++++
 .../apache/kafka/streams/kstream/Windowed.java  |  38 +++++++
 .../apache/kafka/streams/kstream/Windows.java   |  80 +++++++++++++
 .../internals/DefaultWindowedDeserializer.java  |  59 ++++++++++
 .../internals/DefaultWindowedSerializer.java    |  57 ++++++++++
 .../kstream/internals/HoppingWindow.java        |  37 +++++++
 .../streams/kstream/internals/KStreamImpl.java  | 111 +++++++++++++++----
 .../streams/kstream/internals/KTableImpl.java   |  69 ++++++++++++
 .../kstream/internals/SlidingWindow.java        |  38 +++++++
 .../streams/kstream/internals/TopKSupplier.java | 106 ++++++++++++++++++
 .../kstream/internals/UnlimitedWindow.java      |  37 +++++++
 .../streams/kstream/KStreamBuilderTest.java     |   6 +-
 .../kstream/internals/KStreamBranchTest.java    |   2 +-
 .../kstream/internals/KStreamFilterTest.java    |   4 +-
 .../kstream/internals/KStreamFlatMapTest.java   |   2 +-
 .../internals/KStreamFlatMapValuesTest.java     |   2 +-
 .../kstream/internals/KStreamImplTest.java      |  33 +++++-
 .../internals/KStreamKStreamJoinTest.java       |  20 ++--
 .../internals/KStreamKStreamLeftJoinTest.java   |  14 +--
 .../internals/KStreamKTableLeftJoinTest.java    |   4 +-
 .../kstream/internals/KStreamMapTest.java       |   2 +-
 .../kstream/internals/KStreamMapValuesTest.java |   2 +-
 .../kstream/internals/KStreamTransformTest.java |   2 +-
 .../internals/KStreamTransformValuesTest.java   |   2 +-
 41 files changed, 1456 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 819bd68..88a8955 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -45,7 +45,7 @@ public class KStreamJob {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> stream1 = builder.from("topic1");
+        KStream<String, String> stream1 = builder.stream("topic1");
 
         KStream<String, Integer> stream2 =
             stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
new file mode 100644
index 0000000..45ff58e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.examples;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KafkaStreaming;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.util.Properties;
+
+public class KTableJob {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable");
+        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        StreamingConfig config = new StreamingConfig(props);
+
+        Serializer<String> stringSerializer = new StringSerializer();
+        Deserializer<String> stringDeserializer = new StringDeserializer();
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        // stream aggregate
+        KStream<String, Long> stream1 = builder.stream("topic1");
+
+        @SuppressWarnings("unchecked")
+        KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() {
+            @Override
+            public long apply(String key, Long value) {
+                return value;
+            }
+        }, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer);
+
+        // table aggregation
+        KTable<String, String> table1 = builder.table("topic2");
+
+        KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() {
+            @Override
+            public String apply(String key, String value) {
+                return value;
+            }
+        }, new KeyValueToLongMapper<String, String>() {
+            @Override
+            public long apply(String key, String value) {
+                return Long.parseLong(value);
+            }
+        }, stringSerializer, stringDeserializer, "table2");
+
+        // stream-table join
+        KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() {
+            @Override
+            public Long apply(Long value1, Long value2) {
+                if (value2 == null)
+                    return 0L;
+                else
+                    return value1 * value2;
+            }
+        });
+
+        // table-table join
+        KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() {
+            @Override
+            public String apply(String value1, Long value2) {
+                if (value2 == null)
+                    return value1 + "-null";
+                else if (value1 == null)
+                    return "null-" + value2;
+                else
+                    return value1 + "-" + value2;
+            }
+        });
+
+        wtable1.to("topic3");
+
+        KafkaStreaming kstream = new KafkaStreaming(builder, config);
+        kstream.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
new file mode 100644
index 0000000..d715fbd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Aggregator<K, V, T> {
+    /**
+     * Set the initial aggregate value
+     */
+    T initialValue();
+
+    /**
+     * When a new record with the aggregate key is added,
+     * updating the aggregate value for this key
+     */
+    T add(K aggKey, V value, T aggregate);
+
+    /**
+     * when an old record with the aggregate key is removed,
+     * updating the aggregate value for this key
+     */
+    T remove(K aggKey, V value, T aggregate);
+
+    /**
+     * Merge two aggregate values
+     */
+    T merge(T aggr1, T aggr2);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
new file mode 100644
index 0000000..6ed9125
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface AggregatorSupplier<K, V, T> {
+
+    Aggregator<K, V, T> get();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
new file mode 100644
index 0000000..d7141eb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.HoppingWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class HoppingWindows extends Windows<HoppingWindow> {
+
+    private static final long DEFAULT_SIZE_MS = 1000L;
+
+    public final long size;
+
+    public final long period;
+
+    private HoppingWindows(String name, long size, long period) {
+        super(name);
+
+        this.size = size;
+        this.period = period;
+    }
+
+    /**
+     * Returns a half-interval hopping window definition with the window size in milliseconds
+     * of the form &#91; N &#42; default_size, N &#42; default_size + default_size &#41;
+     */
+    public static HoppingWindows of(String name) {
+        return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS);
+    }
+
+    /**
+     * Returns a new hopping window definition with the original size but reassign the window
+     * period in milliseconds of the form &#91; N &#42; period, N &#42; period + size &#41;
+     */
+    public HoppingWindows with(long size) {
+        return new HoppingWindows(this.name, size, this.period);
+    }
+
+    /**
+     * Returns a new hopping window definition with the original size but reassign the window
+     * period in milliseconds of the form &#91; N &#42; period, N &#42; period + size &#41;
+     */
+    public HoppingWindows every(long period) {
+        return new HoppingWindows(this.name, this.size, period);
+    }
+
+    @Override
+    public Collection<HoppingWindow> windowsFor(long timestamp) {
+        // TODO
+        return Collections.<HoppingWindow>emptyList();
+    }
+
+    @Override
+    public boolean equalTo(Windows other) {
+        if (!other.getClass().equals(HoppingWindows.class))
+            return false;
+
+        HoppingWindows otherWindows = (HoppingWindows) other;
+
+        return this.size == otherWindows.size && this.period == otherWindows.period;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
deleted file mode 100644
index 8f0f839..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindowSpec.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * This class is used to specify the behaviour of windowed joins.
- */
-public class JoinWindowSpec {
-
-    public final String name;
-    public final long before;
-    public final long after;
-    public final long retention;
-    public final int segments;
-
-    private JoinWindowSpec(String name, long before, long after, long retention, int segments) {
-        this.name = name;
-        this.after = after;
-        this.before = before;
-        this.retention = retention;
-        this.segments = segments;
-    }
-
-    public static JoinWindowSpec of(String name) {
-        return new JoinWindowSpec(name, 0L, 0L, 0L, 3);
-    }
-
-    /**
-     * Specifies that records of the same key are joinable if their timestamp stamps are within
-     * timeDifference.
-     *
-     * @param timeDifference
-     * @return
-     */
-    public JoinWindowSpec within(long timeDifference) {
-        return new JoinWindowSpec(name, timeDifference, timeDifference, retention, segments);
-    }
-
-    /**
-     * Specifies that records of the same key are joinable if their timestamp stamps are within
-     * timeDifference, and if the timestamp of a record from the secondary stream is
-     * is earlier than or equal to the timestamp of a record from the first stream.
-     *
-     * @param timeDifference
-     * @return
-     */
-    public JoinWindowSpec before(long timeDifference) {
-        return new JoinWindowSpec(name, timeDifference, 0L, retention, segments);
-    }
-
-    /**
-     * Specifies that records of the same key are joinable if their timestamp stamps are within
-     * timeDifference, and if the timestamp of a record from the secondary stream is
-     * is later than or equal to the timestamp of a record from the first stream.
-     *
-     * @param timeDifference
-     * @return
-     */
-    public JoinWindowSpec after(long timeDifference) {
-        return new JoinWindowSpec(name, 0L, timeDifference, retention, segments);
-    }
-
-    /**
-     * Specifies the retention period of windows
-     * @param retentionPeriod
-     * @return
-     */
-    public JoinWindowSpec retentionPeriod(long retentionPeriod) {
-        return new JoinWindowSpec(name, before, after, retentionPeriod, segments);
-    }
-
-    public JoinWindowSpec segments(int segments) {
-        return new JoinWindowSpec(name, before, after, retention, segments);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
new file mode 100644
index 0000000..50aff9d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+
+import java.util.Collection;
+
+/**
+ * This class is used to specify the behaviour of windowed joins.
+ */
+public class JoinWindows extends Windows<SlidingWindow> {
+
+    private static final int DEFAULT_NUM_SEGMENTS = 3;
+
+    public final long before;
+    public final long after;
+    public final int segments;
+
+    private JoinWindows(String name, long before, long after, int segments) {
+        super(name);
+
+        this.after = after;
+        this.before = before;
+        this.segments = segments;
+    }
+
+    public static JoinWindows of(String name) {
+        return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS);
+    }
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * timeDifference.
+     *
+     * @param timeDifference
+     * @return
+     */
+    public JoinWindows within(long timeDifference) {
+        return new JoinWindows(this.name, timeDifference, timeDifference, this.segments);
+    }
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * timeDifference, and if the timestamp of a record from the secondary stream is
+     * is earlier than or equal to the timestamp of a record from the first stream.
+     *
+     * @param timeDifference
+     * @return
+     */
+    public JoinWindows before(long timeDifference) {
+        return new JoinWindows(this.name, timeDifference, this.after, this.segments);
+    }
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamp stamps are within
+     * timeDifference, and if the timestamp of a record from the secondary stream is
+     * is later than or equal to the timestamp of a record from the first stream.
+     *
+     * @param timeDifference
+     * @return
+     */
+    public JoinWindows after(long timeDifference) {
+        return new JoinWindows(this.name, this.before, timeDifference, this.segments);
+    }
+
+    /**
+     * Specifies the number of segments to be used for rolling the window store,
+     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
+     *
+     * @param segments
+     * @return
+     */
+    protected JoinWindows segments(int segments) {
+        return new JoinWindows(name, before, after, segments);
+    }
+
+    @Override
+    public Collection<SlidingWindow> windowsFor(long timestamp) {
+        // this function should never be called
+        throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
+    }
+
+    @Override
+    public boolean equalTo(Windows other) {
+        if (!other.getClass().equals(JoinWindows.class))
+            return false;
+
+        JoinWindows otherWindows = (JoinWindows) other;
+
+        return this.before == otherWindows.before && this.after == otherWindows.after;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 29115c7..dace7e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
+import java.util.Collection;
+
 /**
  * KStream is an abstraction of a stream of key-value pairs.
  *
@@ -141,7 +143,7 @@ public interface KStream<K, V> {
     /**
      * Applies a stateful transformation to all elements in this stream.
      *
-     * @param transformerSupplier the class of TransformerDef
+     * @param transformerSupplier the class of valueTransformerSupplier
      * @param stateStoreNames the names of the state store used by the processor
      * @return the instance of KStream that contains transformed keys and values
      */
@@ -150,7 +152,7 @@ public interface KStream<K, V> {
     /**
      * Applies a stateful transformation to all values in this stream.
      *
-     * @param valueTransformerSupplier the class of TransformerDef
+     * @param valueTransformerSupplier the class of valueTransformerSupplier
      * @param stateStoreNames the names of the state store used by the processor
      * @return the instance of KStream that contains the keys and transformed values
      */
@@ -169,7 +171,7 @@ public interface KStream<K, V> {
      *
      * @param otherStream the instance of KStream joined with this stream
      * @param joiner ValueJoiner
-     * @param joinWindowSpec the specification of the join window
+     * @param windows the specification of the join window
      * @param keySerializer key serializer,
      *                      if not specified the default serializer defined in the configs will be used
      * @param thisValueSerializer value serializer for this stream,
@@ -188,7 +190,7 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> join(
             KStream<K, V1> otherStream,
             ValueJoiner<V, V1, V2> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V> thisValueSerializer,
             Serializer<V1> otherValueSerializer,
@@ -201,7 +203,7 @@ public interface KStream<K, V> {
      *
      * @param otherStream the instance of KStream joined with this stream
      * @param joiner ValueJoiner
-     * @param joinWindowSpec the specification of the join window
+     * @param windows the specification of the join window
      * @param keySerializer key serializer,
      *                      if not specified the default serializer defined in the configs will be used
      * @param thisValueSerializer value serializer for this stream,
@@ -220,7 +222,7 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> outerJoin(
             KStream<K, V1> otherStream,
             ValueJoiner<V, V1, V2> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V> thisValueSerializer,
             Serializer<V1> otherValueSerializer,
@@ -233,6 +235,7 @@ public interface KStream<K, V> {
      *
      * @param otherStream the instance of KStream joined with this stream
      * @param joiner ValueJoiner
+     * @param windows the specification of the join window
      * @param keySerializer key serializer,
      *                      if not specified the default serializer defined in the configs will be used
      * @param otherValueSerializer value serializer for other stream,
@@ -247,7 +250,7 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> leftJoin(
             KStream<K, V1> otherStream,
             ValueJoiner<V, V1, V2> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V1> otherValueSerializer,
             Deserializer<K> keyDeserializer,
@@ -258,9 +261,79 @@ public interface KStream<K, V> {
      *
      * @param ktable the instance of KTable joined with this stream
      * @param joiner ValueJoiner
-     * @param <V1>   the value type of the other stream
+     * @param <V1>   the value type of the table
      * @param <V2>   the value type of the new stream
      */
     <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner);
 
+    /**
+     * Aggregate values of this stream by key on a window basis.
+     *
+     * @param aggregatorSupplier the class of aggregatorSupplier
+     * @param windows the specification of the aggregation window
+     * @param <T>   the value type of the aggregated table
+     */
+    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+                                                                Windows<W> windows,
+                                                                Serializer<K> keySerializer,
+                                                                Serializer<T> aggValueSerializer,
+                                                                Deserializer<K> keyDeserializer,
+                                                                Deserializer<T> aggValueDeserializer);
+
+    /**
+     * Sum extracted long integer values of this stream by key on a window basis.
+     *
+     * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
+                                                          Windows<W> windows,
+                                                          Serializer<K> keySerializer,
+                                                          Deserializer<K> keyDeserializer);
+
+    /**
+     * Sum extracted integer values of this stream by key on a window basis.
+     *
+     * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
+                                                             Windows<W> windows,
+                                                             Serializer<K> keySerializer,
+                                                             Deserializer<K> keyDeserializer);
+
+    /**
+     * Sum extracted double decimal values of this stream by key on a window basis.
+     *
+     * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
+                                                            Windows<W> windows,
+                                                            Serializer<K> keySerializer,
+                                                            Deserializer<K> keyDeserializer);
+
+    /**
+     * Count number of records of this stream by key on a window basis.
+     *
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+                                                            Serializer<K> keySerializer,
+                                                            Deserializer<K> keyDeserializer);
+
+    /**
+     * Get the top-k values of this stream by key on a window basis.
+     *
+     * @param k parameter of the top-k computation
+     * @param valueSelector the class of KeyValueMapper to extract the comparable value
+     * @param windows the specification of the aggregation window
+     */
+    <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
+                                                                                                KeyValueMapper<K, V, V1> valueSelector,
+                                                                                                Windows<W> windows,
+                                                                                                Serializer<K> keySerializer,
+                                                                                                Serializer<V1> aggValueSerializer,
+                                                                                                Deserializer<K> keyDeserializer,
+                                                                                                Deserializer<V1> aggValueDeserializer);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 32d3a9d..b50cffb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -46,8 +46,8 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param topics          the topic names, if empty default to all the topics in the config
      * @return KStream
      */
-    public <K, V> KStream<K, V> from(String... topics) {
-        return from(null, null, topics);
+    public <K, V> KStream<K, V> stream(String... topics) {
+        return stream(null, null, topics);
     }
 
     /**
@@ -60,7 +60,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param topics          the topic names, if empty default to all the topics in the config
      * @return KStream
      */
-    public <K, V> KStream<K, V> from(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) {
+    public <K, V> KStream<K, V> stream(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
 
         addSource(name, keyDeserializer, valDeserializer, topics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
deleted file mode 100644
index 4d73128..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * KStreamWindowed is an abstraction of a stream of key-value pairs with a window.
- */
-public interface KStreamWindowed<K, V> extends KStream<K, V> {
-
-    /**
-     * Creates a new stream by joining this windowed stream with the other windowed stream.
-     * Each element arrived from either of the streams is joined with elements in a window of each other.
-     * The resulting values are computed by applying a joiner.
-     *
-     * @param other  the other windowed stream
-     * @param joiner ValueJoiner
-     * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
-     * @return KStream
-     */
-    <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index c6e7975..997edcd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,6 +20,8 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.Collection;
+
 /**
  * KTable is an abstraction of a change log stream.
  *
@@ -112,7 +114,7 @@ public interface KTable<K, V> {
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
      * @param <V2>   the value type of the new stream
-     * @return the instance of KStream
+     * @return the instance of KTable
      */
     <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
 
@@ -123,7 +125,7 @@ public interface KTable<K, V> {
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
      * @param <V2>   the value type of the new stream
-     * @return the instance of KStream
+     * @return the instance of KTable
      */
     <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
 
@@ -134,8 +136,90 @@ public interface KTable<K, V> {
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
      * @param <V2>   the value type of the new stream
-     * @return the instance of KStream
+     * @return the instance of KTable
      */
     <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
 
+    /**
+     * Aggregate values of this table by the selected key.
+     *
+     * @param aggregatorSupplier the class of AggregatorSupplier
+     * @param selector the KeyValue mapper that select the aggregate key
+     * @param name the name of the resulted table
+     * @param <K1>   the key type of the aggregated table
+     * @param <V1>   the value type of the aggregated table
+     * @return the instance of KTable
+     */
+    <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+                                          KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                          Serializer<K> keySerializer,
+                                          Serializer<V2> aggValueSerializer,
+                                          Deserializer<K> keyDeserializer,
+                                          Deserializer<V2> aggValueDeserializer,
+                                          String name);
+
+    /**
+     * Sum extracted long integer values of this table by the selected aggregation key
+     *
+     * @param keySelector the class of KeyValueMapper to select the aggregation key
+     * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
+     * @param name the name of the resulted table
+     */
+    <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
+                              KeyValueToLongMapper<K, V> valueSelector,
+                              Serializer<K> keySerializer,
+                              Deserializer<K> keyDeserializer,
+                              String name);
+
+    /**
+     * Sum extracted integer values of this table by the selected aggregation key
+     *
+     * @param keySelector the class of KeyValueMapper to select the aggregation key
+     * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value
+     * @param name the name of the resulted table
+     */
+    <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
+                                 KeyValueToIntMapper<K, V> valueSelector,
+                                 Serializer<K> keySerializer,
+                                 Deserializer<K> keyDeserializer,
+                                 String name);
+
+    /**
+     * Sum extracted double decimal values of this table by the selected aggregation key
+     *
+     * @param keySelector the class of KeyValueMapper to select the aggregation key
+     * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value
+     * @param name the name of the resulted table
+     */
+    <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
+                                KeyValueToDoubleMapper<K, V> valueSelector,
+                                Serializer<K> keySerializer,
+                                Deserializer<K> keyDeserializer,
+                                String name);
+
+    /**
+     * Count number of records of this table by the selected aggregation key
+     *
+     * @param keySelector the class of KeyValueMapper to select the aggregation key
+     * @param name the name of the resulted table
+     */
+    <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
+                                Serializer<K> keySerializer,
+                                Deserializer<K> keyDeserializer,
+                                String name);
+
+    /**
+     * Get the top-k values of this table by the selected aggregation key
+     *
+     * @param k parameter of the top-k computation
+     * @param keySelector the class of KeyValueMapper to select the aggregation key
+     * @param name the name of the resulted table
+     */
+    <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
+                                                                    KeyValueMapper<K, V, K1> keySelector,
+                                                                    Serializer<K> keySerializer,
+                                                                    Serializer<V1> aggValueSerializer,
+                                                                    Deserializer<K> keyDeserializer,
+                                                                    Deserializer<V1> aggValueDeserializer,
+                                                                    String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
new file mode 100644
index 0000000..ae3b858
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueToDoubleMapper<K, V> {
+
+    double apply(K key, V value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
new file mode 100644
index 0000000..72e5ee9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueToIntMapper<K, V> {
+
+    int apply(K key, V value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
new file mode 100644
index 0000000..3a8d8a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface KeyValueToLongMapper<K, V> {
+
+    long apply(K key, V value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
new file mode 100644
index 0000000..ffdb4ad
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class SlidingWindows extends Windows<SlidingWindow> {
+
+    private static final long DEFAULT_SIZE_MS = 1000L;
+
+    public final long size;
+
+    private SlidingWindows(String name, long size) {
+        super(name);
+
+        this.size = size;
+    }
+
+    /**
+     * Returns a half-interval sliding window definition with the default window size
+     */
+    public static SlidingWindows of(String name) {
+        return new SlidingWindows(name, DEFAULT_SIZE_MS);
+    }
+
+    /**
+     * Returns a half-interval sliding window definition with the window size in milliseconds
+     */
+    public SlidingWindows with(long size) {
+        return new SlidingWindows(this.name, size);
+    }
+
+    @Override
+    public Collection<SlidingWindow> windowsFor(long timestamp) {
+        // TODO
+        return Collections.<SlidingWindow>emptyList();
+    }
+
+    @Override
+    public boolean equalTo(Windows other) {
+        if (!other.getClass().equals(SlidingWindows.class))
+            return false;
+
+        SlidingWindows otherWindows = (SlidingWindows) other;
+
+        return this.size == otherWindows.size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index 2c2d8dd..93d930d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -20,5 +20,4 @@ package org.apache.kafka.streams.kstream;
 public interface TransformerSupplier<K, V, R> {
 
     Transformer<K, V, R> get();
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
new file mode 100644
index 0000000..89cb0a8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class UnlimitedWindows extends Windows<UnlimitedWindow> {
+
+    private static final long DEFAULT_START_TIMESTAMP = 0L;
+
+    public final long start;
+
+    private UnlimitedWindows(String name, long start) {
+        super(name);
+
+        this.start = start;
+    }
+
+    /**
+     * Returns an unlimited window definition
+     */
+    public static UnlimitedWindows of(String name) {
+        return new UnlimitedWindows(name, DEFAULT_START_TIMESTAMP);
+    }
+
+    public UnlimitedWindows startOn(long start) {
+        return new UnlimitedWindows(this.name, start);
+    }
+
+    @Override
+    public Collection<UnlimitedWindow> windowsFor(long timestamp) {
+        // TODO
+        return Collections.<UnlimitedWindow>emptyList();
+    }
+
+    @Override
+    public boolean equalTo(Windows other) {
+        if (!other.getClass().equals(UnlimitedWindows.class))
+            return false;
+
+        UnlimitedWindows otherWindows = (UnlimitedWindows) other;
+
+        return this.start == otherWindows.start;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
new file mode 100644
index 0000000..63e0a35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public abstract class Window {
+
+    private long start;
+    private long end;
+
+    public Window(long start, long end) {
+        this.start = start;
+        this.end = end;
+    }
+
+    /**
+     * Returns the start timestamp of this window, inclusive
+     */
+    public long start() {
+        return start;
+    }
+
+    /**
+     * Returns the end timestamp of this window, exclusive
+     */
+    public long end() {
+        return end;
+    }
+
+    public boolean overlap(Window other) {
+        return this.start() < other.end() || other.start() < this.end();
+    }
+
+    public boolean equalsTo(Window other) {
+        return this.start() == other.start() && this.end() == other.end();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
new file mode 100644
index 0000000..03fb656
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class Windowed<T> {
+
+    private T value;
+
+    private Window window;
+
+    public Windowed(T value, Window window) {
+        this.value = value;
+        this.window = window;
+    }
+
+    public T value() {
+        return value;
+    }
+
+    public Window window() {
+        return window;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
new file mode 100644
index 0000000..ab8d822
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class Windows<W extends Window> {
+
+    private static final long DEFAULT_EMIT_DURATION = 1000L;
+
+    private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
+
+    private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
+
+    private long emitDuration;
+
+    private long maintainDuration;
+
+    protected String name;
+
+    protected Windows(String name) {
+        this.name = name;
+        this.emitDuration = DEFAULT_EMIT_DURATION;
+        this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Set the window emit duration in milliseconds of system time
+     */
+    public Windows emit(long duration) {
+        this.emitDuration = duration;
+
+        return this;
+    }
+
+    /**
+     * Set the window maintain duration in milliseconds of system time
+     */
+    public Windows until(long duration) {
+        this.maintainDuration = duration;
+
+        return this;
+    }
+
+    public long emitEveryMs() {
+        return this.emitDuration;
+    }
+
+    public long maintainMs() {
+        return this.maintainDuration;
+    }
+
+    protected String newName(String prefix) {
+        return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
+    }
+
+    abstract boolean equalTo(Windows other);
+
+    abstract Collection<W> windowsFor(long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
new file mode 100644
index 0000000..9a14c53
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class DefaultWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
+
+    private static final int TIMESTAMP_SIZE = 8;
+
+    private Deserializer<T> inner;
+
+    public DefaultWindowedDeserializer(Deserializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // do nothing
+    }
+
+    @Override
+    public Windowed<T> deserialize(String topic, byte[] data) {
+
+        byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
+
+        System.arraycopy(data, 0, bytes, 0, bytes.length);
+
+        long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
+
+        // always read as unlimited window
+        return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
+    }
+
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
new file mode 100644
index 0000000..4bf2b28
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/DefaultWindowedSerializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class DefaultWindowedSerializer<T> implements Serializer<Windowed<T>> {
+
+    private static final int TIMESTAMP_SIZE = 8;
+
+    private Serializer<T> inner;
+
+    public DefaultWindowedSerializer(Serializer<T> inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // do nothing
+    }
+
+    @Override
+    public byte[] serialize(String topic, Windowed<T> data) {
+        byte[] serializedKey = inner.serialize(topic, data.value());
+
+        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(data.window().start());
+
+        return buf.array();
+    }
+
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
new file mode 100644
index 0000000..8b0b2fb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class HoppingWindow extends Window {
+
+    public HoppingWindow(long start, long end) {
+        super(start, end);
+    }
+
+    @Override
+    public boolean overlap(Window other) {
+        return super.overlap(other) && other.getClass().equals(HoppingWindow.class);
+    }
+
+    @Override
+    public boolean equalsTo(Window other) {
+        return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index f47fe0f..4505e74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -19,10 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
+import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
+import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -30,11 +34,15 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
 import java.lang.reflect.Array;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -62,6 +70,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
+    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+
     public static final String SINK_NAME = "KSTREAM-SINK-";
 
     public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
@@ -187,7 +197,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                  Deserializer<V> valDeserializer) {
         to(topic, keySerializer, valSerializer);
 
-        return topology.from(keyDeserializer, valDeserializer, topic);
+        return topology.stream(keyDeserializer, valDeserializer, topic);
     }
 
     @Override
@@ -239,7 +249,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <V1, R> KStream<K, R> join(
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerialzier,
             Serializer<V> thisValueSerialzier,
             Serializer<V1> otherValueSerialzier,
@@ -247,7 +257,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             Deserializer<V> thisValueDeserialzier,
             Deserializer<V1> otherValueDeserialzier) {
 
-        return join(other, joiner, joinWindowSpec,
+        return join(other, joiner, windows,
                 keySerialzier, thisValueSerialzier, otherValueSerialzier,
                 keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, false);
     }
@@ -256,7 +266,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerialzier,
             Serializer<V> thisValueSerialzier,
             Serializer<V1> otherValueSerialzier,
@@ -264,7 +274,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             Deserializer<V> thisValueDeserialzier,
             Deserializer<V1> otherValueDeserialzier) {
 
-        return join(other, joiner, joinWindowSpec,
+        return join(other, joiner, windows,
                 keySerialzier, thisValueSerialzier, otherValueSerialzier,
                 keyDeserialier, thisValueDeserialzier, otherValueDeserialzier, true);
     }
@@ -273,7 +283,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     private <V1, R> KStream<K, R> join(
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerialzier,
             Serializer<V> thisValueSerialzier,
             Serializer<V1> otherValueSerialzier,
@@ -286,21 +296,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         RocksDBWindowStoreSupplier<K, V> thisWindow =
                 new RocksDBWindowStoreSupplier<>(
-                        joinWindowSpec.name + "-1",
-                        joinWindowSpec.before,
-                        joinWindowSpec.after,
-                        joinWindowSpec.retention,
-                        joinWindowSpec.segments,
+                        windows.name() + "-this",
+                        windows.before,
+                        windows.after,
+                        windows.maintainMs(),
+                        windows.segments,
                         new Serdes<>("", keySerialzier, keyDeserialier, thisValueSerialzier, thisValueDeserialzier),
                         null);
 
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
                 new RocksDBWindowStoreSupplier<>(
-                        joinWindowSpec.name + "-2",
-                        joinWindowSpec.after,
-                        joinWindowSpec.before,
-                        joinWindowSpec.retention,
-                        joinWindowSpec.segments,
+                        windows.name() + "-other",
+                        windows.before,
+                        windows.after,
+                        windows.maintainMs(),
+                        windows.segments,
                         new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
                         null);
 
@@ -333,7 +343,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <V1, R> KStream<K, R> leftJoin(
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
-            JoinWindowSpec joinWindowSpec,
+            JoinWindows windows,
             Serializer<K> keySerialzier,
             Serializer<V1> otherValueSerialzier,
             Deserializer<K> keyDeserialier,
@@ -343,11 +353,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
                 new RocksDBWindowStoreSupplier<>(
-                        joinWindowSpec.name,
-                        joinWindowSpec.after,
-                        joinWindowSpec.before,
-                        joinWindowSpec.retention,
-                        joinWindowSpec.segments,
+                        windows.name() + "-this",
+                        windows.before,
+                        windows.after,
+                        windows.maintainMs(),
+                        windows.segments,
                         new Serdes<>("", keySerialzier, keyDeserialier, otherValueSerialzier, otherValueDeserialzier),
                         null);
 
@@ -376,4 +386,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(topology, name, allSourceNodes);
     }
 
+    @Override
+    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+                                                                       Windows<W> windows,
+                                                                       Serializer<K> keySerializer,
+                                                                       Serializer<T> aggValueSerializer,
+                                                                       Deserializer<K> keyDeserializer,
+                                                                       Deserializer<T> aggValueDeserializer) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
+                                                                 Windows<W> windows,
+                                                                 Serializer<K> keySerializer,
+                                                                 Deserializer<K> keyDeserializer) {
+        // TODO
+        return null;
+    }
+
+    public <W extends Window> KTable<Windowed<K>, Integer> sumByKey(KeyValueToIntMapper<K, V> valueSelector,
+                                                                    Windows<W> windows,
+                                                                    Serializer<K> keySerializer,
+                                                                    Deserializer<K> keyDeserializer) {
+        // TODO
+        return null;
+    }
+
+    public <W extends Window> KTable<Windowed<K>, Double> sumByKey(KeyValueToDoubleMapper<K, V> valueSelector,
+                                                                   Windows<W> windows,
+                                                                   Serializer<K> keySerializer,
+                                                                   Deserializer<K> keyDeserializer) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
+                                                                   Serializer<K> keySerializer,
+                                                                   Deserializer<K> keyDeserializer) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <W extends Window, V1 extends Comparable<V1>> KTable<Windowed<K>, Collection<V1>> topKByKey(int k,
+                                                                                                       KeyValueMapper<K, V, V1> valueSelector,
+                                                                                                       Windows<W> windows,
+                                                                                                       Serializer<K> keySerializer,
+                                                                                                       Serializer<V1> aggValueSerializer,
+                                                                                                       Deserializer<K> keyDeserializer,
+                                                                                                       Deserializer<V1> aggValueDeserializer) {
+        // TODO
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9f97958..32d3cc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,15 +19,22 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper;
+import org.apache.kafka.streams.kstream.KeyValueToIntMapper;
+import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
+import java.util.Collection;
 import java.util.Set;
 
 /**
@@ -271,4 +278,66 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes);
     }
 
+    @Override
+    public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+                                                 KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                                 Serializer<K> keySerializer,
+                                                 Serializer<V2> aggValueSerializer,
+                                                 Deserializer<K> keyDeserializer,
+                                                 Deserializer<V2> aggValueDeserializer,
+                                                 String name) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
+                                     KeyValueToLongMapper<K, V> valueSelector,
+                                     Serializer<K> keySerializer,
+                                     Deserializer<K> keyDeserializer,
+                                     String name) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <K1> KTable<K1, Integer> sum(KeyValueMapper<K, V, K1> keySelector,
+                                        KeyValueToIntMapper<K, V> valueSelector,
+                                        Serializer<K> keySerializer,
+                                        Deserializer<K> keyDeserializer,
+                                        String name) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <K1> KTable<K1, Double> sum(KeyValueMapper<K, V, K1> keySelector,
+                                       KeyValueToDoubleMapper<K, V> valueSelector,
+                                       Serializer<K> keySerializer,
+                                       Deserializer<K> keyDeserializer,
+                                       String name) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
+                                       Serializer<K> keySerializer,
+                                       Deserializer<K> keyDeserializer,
+                                       String name) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public <K1, V1 extends Comparable<V1>> KTable<K1, Collection<V1>> topK(int k,
+                                                                           KeyValueMapper<K, V, K1> keySelector,
+                                                                           Serializer<K> keySerializer,
+                                                                           Serializer<V1> aggValueSerializer,
+                                                                           Deserializer<K> keyDeserializer,
+                                                                           Deserializer<V1> aggValueDeserializer,
+                                                                           String name) {
+        // TODO
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40d731b8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
new file mode 100644
index 0000000..a6b5149
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class SlidingWindow extends Window {
+
+    public SlidingWindow(long start, long end) {
+        super(start, end);
+    }
+
+    @Override
+    public boolean overlap(Window other) {
+        return super.overlap(other) && other.getClass().equals(SlidingWindow.class);
+    }
+
+    @Override
+    public boolean equalsTo(Window other) {
+        return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class);
+    }
+}