You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/18 21:14:55 UTC

[1/2] kafka git commit: KAFKA-3104: add windowed aggregation to KStream

Repository: kafka
Updated Branches:
  refs/heads/trunk cc3570d1a -> a62eb5993


http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
index 5189318..2f30712 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.text.SimpleDateFormat;
@@ -34,6 +35,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
 
+    private static final long USE_CURRENT_TIMESTAMP = -1L;
+
     private static class Segment extends RocksDBStore<byte[], byte[]> {
         public final long id;
 
@@ -73,11 +76,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         @Override
-        public V next() {
+        public KeyValue<Long, V> next() {
             if (index >= iterators.length)
                 throw new NoSuchElementException();
 
-            return serdes.valueFrom(iterators[index].next().value());
+            Entry<byte[], byte[]> entry = iterators[index].next();
+
+            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
+                                  serdes.valueFrom(entry.value()));
         }
 
         @Override
@@ -86,6 +92,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 iterators[index].remove();
         }
 
+        @Override
         public void close() {
             for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
                 iterator.close();
@@ -94,9 +101,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     private final String name;
-    private final long windowBefore;
-    private final long windowAfter;
     private final long segmentInterval;
+    private final boolean retainDuplicates;
     private final Segment[] segments;
     private final Serdes<K, V> serdes;
     private final SimpleDateFormat formatter;
@@ -105,14 +111,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private long currentSegmentId = -1L;
     private int seqnum = 0;
 
-    public RocksDBWindowStore(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes) {
+    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
         this.name = name;
-        this.windowBefore = windowBefore;
-        this.windowAfter = windowAfter;
-
-        // The retention period must be at least two times as long as the total window size
-        if ((this.windowBefore + this.windowAfter + 1) * 2 > retentionPeriod)
-            retentionPeriod = (this.windowBefore + this.windowAfter + 1) * 2;
 
         // The segment interval must be greater than MIN_SEGMENT_INTERVAL
         this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
@@ -120,6 +120,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         this.segments = new Segment[numSegments];
         this.serdes = serdes;
 
+        this.retainDuplicates = retainDuplicates;
+
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
         this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
@@ -158,12 +160,18 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        putAndReturnInternalKey(key, value);
+        putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
     }
 
     @Override
-    public byte[] putAndReturnInternalKey(K key, V value) {
-        long timestamp = context.timestamp();
+    public void put(K key, V value, long timestamp) {
+        putAndReturnInternalKey(key, value, timestamp);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value, long t) {
+        long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
+
         long segmentId = segmentId(timestamp);
 
         if (segmentId > currentSegmentId) {
@@ -174,7 +182,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         // If the record is within the retention period, put it in the store.
         if (segmentId > currentSegmentId - segments.length) {
-            seqnum = (seqnum + 1) & 0x7FFFFFFF;
+            if (retainDuplicates)
+                seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
             getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
             return binaryKey;
@@ -213,10 +222,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public WindowStoreIterator<V> fetch(K key, long timestamp) {
-        long timeFrom = Math.max(0L, timestamp - windowBefore);
-        long timeTo = Math.max(0L, timestamp + windowAfter);
-
+    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
         long segFrom = segmentId(timeFrom);
         long segTo = segmentId(Math.max(0L, timeTo));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
index 73814ef..fcdcb9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
@@ -32,18 +32,16 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final long windowBefore;
-    private final long windowAfter;
     private final long retentionPeriod;
+    private final boolean retainDuplicates;
     private final int numSegments;
     private final Serdes serdes;
     private final Time time;
 
-    public RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
         this.name = name;
-        this.windowBefore = windowBefore;
-        this.windowAfter = windowAfter;
         this.retentionPeriod = retentionPeriod;
+        this.retainDuplicates = retainDuplicates;
         this.numSegments = numSegments;
         this.serdes = serdes;
         this.time = time;
@@ -54,7 +52,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, windowBefore, windowAfter, retentionPeriod, numSegments, serdes), "rocksdb-window", time);
+        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 344aa91..b17d889 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -25,9 +25,11 @@ public interface WindowStore<K, V> extends StateStore {
 
     void put(K key, V value);
 
-    byte[] putAndReturnInternalKey(K key, V value);
+    void put(K key, V value, long timestamp);
 
-    WindowStoreIterator<V> fetch(K key, long timestamp);
+    byte[] putAndReturnInternalKey(K key, V value, long timestamp);
+
+    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
     void putInternal(byte[] binaryKey, byte[] binaryValue);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index e57a00f..55d1ac3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -19,8 +19,10 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.kstream.KeyValue;
+
 import java.util.Iterator;
 
-public interface WindowStoreIterator<E> extends Iterator<E> {
+public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> {
     void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
new file mode 100644
index 0000000..ba596a9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamAggregateTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+
+        private class StringCanonizer implements Aggregator<String, String, String> {
+
+            @Override
+            public String initialValue() {
+                return "0";
+            }
+
+            @Override
+            public String add(String aggKey, String value, String aggregate) {
+                return aggregate + "+" + value;
+            }
+
+            @Override
+            public String remove(String aggKey, String value, String aggregate) {
+                return aggregate + "-" + value;
+            }
+
+            @Override
+            public String merge(String aggr1, String aggr2) {
+                return "(" + aggr1 + ") + (" + aggr2 + ")";
+            }
+        }
+
+        @Override
+        public Aggregator<String, String, String> get() {
+            return new StringCanonizer();
+        }
+    }
+
+    @Test
+    public void testAggBasic() throws Exception {
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+
+            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
+            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizeSupplier(),
+                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            driver.setTime(10L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(11L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(12L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(13L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(14L);
+            driver.process(topic1, "C", "3");
+
+            assertEquals(Utils.mkList(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1",
+
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3",
+
+                    "[A@5]:0+1+1", "[A@10]:0+1",
+                    "[B@5]:0+2+2+2", "[B@10]:0+2",
+                    "[D@5]:0+4+4", "[D@10]:0+4",
+                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 90341a8..e763fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -125,7 +125,7 @@ public class KStreamKStreamJoinTest {
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
             // w2 = { 0:Y0, 1:Y1 }
             // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
@@ -137,7 +137,7 @@ public class KStreamKStreamJoinTest {
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
             // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
             // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
@@ -149,7 +149,7 @@ public class KStreamKStreamJoinTest {
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
             // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
             // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
             for (int i = 0; i < 2; i++) {
                 driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 189cf9d..b5037ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -47,7 +47,7 @@ public class KTableAggregateTest {
 
             @Override
             public String initialValue() {
-                return "";
+                return "0";
             }
 
             @Override
@@ -106,14 +106,14 @@ public class KTableAggregateTest {
             driver.process(topic1, "C", "8");
 
             assertEquals(Utils.mkList(
-                    "A:+1",
-                    "B:+2",
-                    "A:+1+3", "A:+1+3-1",
-                    "B:+2+4", "B:+2+4-2",
-                    "C:+5",
-                    "D:+6",
-                    "B:+2+4-2+7", "B:+2+4-2+7-4",
-                    "C:+5+8", "C:+5+8-5"), proc2.processed);
+                    "A:0+1",
+                    "B:0+2",
+                    "A:0+1+3", "A:0+1+3-1",
+                    "B:0+2+4", "B:0+2+4-2",
+                    "C:0+5",
+                    "D:0+6",
+                    "B:0+2+4-2+7", "B:0+2+4-2+7-4",
+                    "C:0+5+8", "C:0+5+8-5"), proc2.processed);
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
new file mode 100644
index 0000000..f9b6ba5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.TumblingWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowsTest {
+
+    @Test
+    public void hoppingWindows() {
+
+        HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L);
+
+        Map<Long, HoppingWindow> matched = windows.windowsFor(21L);
+
+        assertEquals(3, matched.size());
+
+        assertEquals(new HoppingWindow(10L, 22L), matched.get(10L));
+        assertEquals(new HoppingWindow(15L, 27L), matched.get(15L));
+        assertEquals(new HoppingWindow(20L, 32L), matched.get(20L));
+    }
+
+    @Test
+    public void tumblineWindows() {
+
+        TumblingWindows windows = TumblingWindows.of("test").with(12L);
+
+        Map<Long, TumblingWindow> matched = windows.windowsFor(21L);
+
+        assertEquals(1, matched.size());
+
+        assertEquals(new TumblingWindow(12L, 24L), matched.get(12L));
+    }
+
+    @Test
+    public void unlimitedWindows() {
+
+        UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L);
+
+        Map<Long, UnlimitedWindow> matched = windows.windowsFor(21L);
+
+        assertEquals(1, matched.size());
+
+        assertEquals(new UnlimitedWindow(10L), matched.get(10L));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
index 6bfddfe..fc7a4e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
@@ -53,13 +53,11 @@ public class RocksDBWindowStoreTest {
     private final long windowSize = 3;
     private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
 
-
-    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, long windowBefore, long windowAfter, Serdes<K, V> serdes) {
-        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", windowBefore, windowAfter, retentionPeriod, numSegments, serdes, null);
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context);
         return store;
-
     }
 
     @Test
@@ -83,7 +81,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -100,12 +98,12 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 5L);
                 store.put(5, "five");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
 
                 context.setTime(startTime + 3L);
                 store.put(2, "two+1");
@@ -120,21 +118,21 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 8L);
                 store.put(2, "two+6");
 
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -179,7 +177,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, 0, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -196,12 +194,12 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 5L);
                 store.put(5, "five");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
 
                 context.setTime(startTime + 3L);
                 store.put(2, "two+1");
@@ -216,21 +214,21 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 8L);
                 store.put(2, "two+6");
 
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -275,7 +273,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, 0, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -292,12 +290,12 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 5L);
                 store.put(5, "five");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
 
                 context.setTime(startTime + 3L);
                 store.put(2, "two+1");
@@ -312,21 +310,21 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 8L);
                 store.put(2, "two+6");
 
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -371,14 +369,14 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
                 context.setTime(startTime);
                 store.put(0, "zero");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
 
                 context.setTime(startTime);
                 store.put(0, "zero");
@@ -387,11 +385,11 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime);
                 store.put(0, "zero++");
 
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -430,7 +428,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
             try {
@@ -461,51 +459,52 @@ public class RocksDBWindowStoreTest {
                 store.put(5, "five");
                 assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
 
                 context.setTime(startTime + incr * 6);
                 store.put(6, "six");
                 assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+
 
                 context.setTime(startTime + incr * 7);
                 store.put(7, "seven");
                 assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
 
                 context.setTime(startTime + incr * 8);
                 store.put(8, "eight");
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
-                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
                 // check segment directories
                 store.flush();
@@ -546,7 +545,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 context.setTime(startTime);
                 store.put(0, "zero");
@@ -595,7 +594,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
 
@@ -604,15 +603,15 @@ public class RocksDBWindowStoreTest {
 
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
-                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
                 // check segment directories
                 store.flush();
@@ -633,7 +632,7 @@ public class RocksDBWindowStoreTest {
     private <E> List<E> toList(WindowStoreIterator<E> iterator) {
         ArrayList<E> list = new ArrayList<>();
         while (iterator.hasNext()) {
-            list.add(iterator.next());
+            list.add(iterator.next().value);
         }
         return list;
     }


[2/2] kafka git commit: KAFKA-3104: add windowed aggregation to KStream

Posted by gu...@apache.org.
KAFKA-3104: add windowed aggregation to KStream

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Mastuda

Closes #781 from guozhangwang/K3104


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62eb599
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62eb599
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62eb599

Branch: refs/heads/trunk
Commit: a62eb5993f5517a64dd1020b0a9bbd1012f7ee67
Parents: cc3570d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Jan 18 12:14:43 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 18 12:14:43 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/HoppingWindows.java   |  23 +-
 .../kafka/streams/kstream/JoinWindows.java      |  33 +--
 .../apache/kafka/streams/kstream/KStream.java   |  18 +-
 .../apache/kafka/streams/kstream/KTable.java    |  18 +-
 .../kafka/streams/kstream/SlidingWindows.java   |  67 ------
 .../kafka/streams/kstream/TumblingWindows.java  |  68 ++++++
 .../kafka/streams/kstream/UnlimitedWindows.java |   8 +-
 .../apache/kafka/streams/kstream/Window.java    |  19 ++
 .../apache/kafka/streams/kstream/Windowed.java  |   5 +
 .../apache/kafka/streams/kstream/Windows.java   |  27 ++-
 .../kstream/internals/KStreamAggWindow.java     |  51 ++++
 .../kstream/internals/KStreamAggregate.java     | 171 +++++++++++++
 .../kstream/internals/KStreamFlatMap.java       |  14 +-
 .../kstream/internals/KStreamFlatMapValues.java |  16 +-
 .../streams/kstream/internals/KStreamImpl.java  |  49 ++--
 .../kstream/internals/KStreamJoinWindow.java    |  11 +-
 .../kstream/internals/KStreamKStreamJoin.java   |  15 +-
 .../internals/KStreamKTableLeftJoin.java        |   6 +-
 .../streams/kstream/internals/KStreamMap.java   |  14 +-
 .../kstream/internals/KStreamMapValues.java     |  14 +-
 .../kstream/internals/KStreamTransform.java     |   8 +-
 .../internals/KTableKTableAbstractJoin.java     |   6 +-
 .../kstream/internals/KTableKTableJoin.java     |  20 +-
 .../kstream/internals/KTableKTableLeftJoin.java |  18 +-
 .../internals/KTableKTableOuterJoin.java        |  20 +-
 .../internals/KTableKTableRightJoin.java        |  18 +-
 .../kstream/internals/KTableMapValues.java      |  38 +--
 .../kstream/internals/KTableRepartitionMap.java |  38 +--
 .../kstream/internals/SlidingWindow.java        |  38 ---
 .../kstream/internals/TumblingWindow.java       |  38 +++
 .../kafka/streams/state/MeteredWindowStore.java |  18 +-
 .../kafka/streams/state/RocksDBStore.java       |   1 +
 .../kafka/streams/state/RocksDBWindowStore.java |  44 ++--
 .../state/RocksDBWindowStoreSupplier.java       |  10 +-
 .../apache/kafka/streams/state/WindowStore.java |   6 +-
 .../streams/state/WindowStoreIterator.java      |   4 +-
 .../kstream/internals/KStreamAggregateTest.java | 154 ++++++++++++
 .../internals/KStreamKStreamJoinTest.java       |   6 +-
 .../kstream/internals/KTableAggregateTest.java  |  18 +-
 .../streams/kstream/internals/WindowsTest.java  |  70 ++++++
 .../streams/state/RocksDBWindowStoreTest.java   | 239 +++++++++----------
 41 files changed, 1000 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
index d7141eb..f354ef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.HoppingWindow;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 public class HoppingWindows extends Windows<HoppingWindow> {
 
@@ -62,9 +62,22 @@ public class HoppingWindows extends Windows<HoppingWindow> {
     }
 
     @Override
-    public Collection<HoppingWindow> windowsFor(long timestamp) {
-        // TODO
-        return Collections.<HoppingWindow>emptyList();
+    public Map<Long, HoppingWindow> windowsFor(long timestamp) {
+        long enclosed = (size - 1) / period;
+
+        long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period);
+
+        Map<Long, HoppingWindow> windows = new HashMap<>();
+        while (windowStart <= timestamp) {
+            // add the window
+            HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size);
+            windows.put(windowStart, window);
+
+            // advance the step period
+            windowStart += this.period;
+        }
+
+        return windows;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 50aff9d..ffc1c1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -18,31 +18,27 @@
 package org.apache.kafka.streams.kstream;
 
 
-import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+import org.apache.kafka.streams.kstream.internals.TumblingWindow;
 
-import java.util.Collection;
+import java.util.Map;
 
 /**
  * This class is used to specify the behaviour of windowed joins.
  */
-public class JoinWindows extends Windows<SlidingWindow> {
-
-    private static final int DEFAULT_NUM_SEGMENTS = 3;
+public class JoinWindows extends Windows<TumblingWindow> {
 
     public final long before;
     public final long after;
-    public final int segments;
 
-    private JoinWindows(String name, long before, long after, int segments) {
+    private JoinWindows(String name, long before, long after) {
         super(name);
 
         this.after = after;
         this.before = before;
-        this.segments = segments;
     }
 
     public static JoinWindows of(String name) {
-        return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS);
+        return new JoinWindows(name, 0L, 0L);
     }
 
     /**
@@ -53,7 +49,7 @@ public class JoinWindows extends Windows<SlidingWindow> {
      * @return
      */
     public JoinWindows within(long timeDifference) {
-        return new JoinWindows(this.name, timeDifference, timeDifference, this.segments);
+        return new JoinWindows(this.name, timeDifference, timeDifference);
     }
 
     /**
@@ -65,7 +61,7 @@ public class JoinWindows extends Windows<SlidingWindow> {
      * @return
      */
     public JoinWindows before(long timeDifference) {
-        return new JoinWindows(this.name, timeDifference, this.after, this.segments);
+        return new JoinWindows(this.name, timeDifference, this.after);
     }
 
     /**
@@ -77,22 +73,11 @@ public class JoinWindows extends Windows<SlidingWindow> {
      * @return
      */
     public JoinWindows after(long timeDifference) {
-        return new JoinWindows(this.name, this.before, timeDifference, this.segments);
-    }
-
-    /**
-     * Specifies the number of segments to be used for rolling the window store,
-     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
-     *
-     * @param segments
-     * @return
-     */
-    protected JoinWindows segments(int segments) {
-        return new JoinWindows(name, before, after, segments);
+        return new JoinWindows(this.name, this.before, timeDifference);
     }
 
     @Override
-    public Collection<SlidingWindow> windowsFor(long timestamp) {
+    public Map<Long, TumblingWindow> windowsFor(long timestamp) {
         // this function should never be called
         throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dace7e0..85d51e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -185,11 +185,11 @@ public interface KStream<K, V> {
      * @param otherValueDeserializer value deserializer for other stream,
      *                      if not specified the default serializer defined in the configs will be used
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> join(
+    <V1, R> KStream<K, R> join(
             KStream<K, V1> otherStream,
-            ValueJoiner<V, V1, V2> joiner,
+            ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V> thisValueSerializer,
@@ -217,11 +217,11 @@ public interface KStream<K, V> {
      * @param otherValueDeserializer value deserializer for other stream,
      *                      if not specified the default serializer defined in the configs will be used
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> outerJoin(
+    <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> otherStream,
-            ValueJoiner<V, V1, V2> joiner,
+            ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V> thisValueSerializer,
@@ -245,11 +245,11 @@ public interface KStream<K, V> {
      * @param otherValueDeserializer value deserializer for other stream,
      *                      if not specified the default serializer defined in the configs will be used
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> leftJoin(
+    <V1, R> KStream<K, R> leftJoin(
             KStream<K, V1> otherStream,
-            ValueJoiner<V, V1, V2> joiner,
+            ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V1> otherValueSerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 9837dae..93eceec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -111,10 +111,10 @@ public interface KTable<K, V> {
      * @param other the instance of KTable joined with this stream
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      * @return the instance of KTable
      */
-    <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+    <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
      * Combines values of this KTable with another KTable using Outer Join.
@@ -122,10 +122,10 @@ public interface KTable<K, V> {
      * @param other the instance of KTable joined with this stream
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      * @return the instance of KTable
      */
-    <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+    <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
      * Combines values of this KTable with another KTable using Left Join.
@@ -133,10 +133,10 @@ public interface KTable<K, V> {
      * @param other the instance of KTable joined with this stream
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      * @return the instance of KTable
      */
-    <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+    <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
      * Aggregate values of this table by the selected key.
@@ -148,14 +148,14 @@ public interface KTable<K, V> {
      * @param <V1>   the value type of the aggregated table
      * @return the instance of KTable
      */
-    <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+    <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier,
                                           KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                           Serializer<K1> keySerializer,
                                           Serializer<V1> valueSerializer,
-                                          Serializer<V2> aggValueSerializer,
+                                          Serializer<T> aggValueSerializer,
                                           Deserializer<K1> keyDeserializer,
                                           Deserializer<V1> valueDeserializer,
-                                          Deserializer<V2> aggValueDeserializer,
+                                          Deserializer<T> aggValueDeserializer,
                                           String name);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
deleted file mode 100644
index ffdb4ad..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-
-import org.apache.kafka.streams.kstream.internals.SlidingWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-public class SlidingWindows extends Windows<SlidingWindow> {
-
-    private static final long DEFAULT_SIZE_MS = 1000L;
-
-    public final long size;
-
-    private SlidingWindows(String name, long size) {
-        super(name);
-
-        this.size = size;
-    }
-
-    /**
-     * Returns a half-interval sliding window definition with the default window size
-     */
-    public static SlidingWindows of(String name) {
-        return new SlidingWindows(name, DEFAULT_SIZE_MS);
-    }
-
-    /**
-     * Returns a half-interval sliding window definition with the window size in milliseconds
-     */
-    public SlidingWindows with(long size) {
-        return new SlidingWindows(this.name, size);
-    }
-
-    @Override
-    public Collection<SlidingWindow> windowsFor(long timestamp) {
-        // TODO
-        return Collections.<SlidingWindow>emptyList();
-    }
-
-    @Override
-    public boolean equalTo(Windows other) {
-        if (!other.getClass().equals(SlidingWindows.class))
-            return false;
-
-        SlidingWindows otherWindows = (SlidingWindows) other;
-
-        return this.size == otherWindows.size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
new file mode 100644
index 0000000..02ece3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.TumblingWindow;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class TumblingWindows extends Windows<TumblingWindow> {
+
+    private static final long DEFAULT_SIZE_MS = 1000L;
+
+    public final long size;
+
+    private TumblingWindows(String name, long size) {
+        super(name);
+
+        this.size = size;
+    }
+
+    /**
+     * Returns a half-interval sliding window definition with the default window size
+     */
+    public static TumblingWindows of(String name) {
+        return new TumblingWindows(name, DEFAULT_SIZE_MS);
+    }
+
+    /**
+     * Returns a half-interval sliding window definition with the window size in milliseconds
+     */
+    public TumblingWindows with(long size) {
+        return new TumblingWindows(this.name, size);
+    }
+
+    @Override
+    public Map<Long, TumblingWindow> windowsFor(long timestamp) {
+        long windowStart = timestamp - timestamp % size;
+
+        return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size));
+    }
+
+    @Override
+    public boolean equalTo(Windows other) {
+        if (!other.getClass().equals(TumblingWindows.class))
+            return false;
+
+        TumblingWindows otherWindows = (TumblingWindows) other;
+
+        return this.size == otherWindows.size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 89cb0a8..6f47253 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 
-import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 
 public class UnlimitedWindows extends Windows<UnlimitedWindow> {
 
@@ -46,9 +46,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     }
 
     @Override
-    public Collection<UnlimitedWindow> windowsFor(long timestamp) {
-        // TODO
-        return Collections.<UnlimitedWindow>emptyList();
+    public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
+        // always return the single unlimited window
+        return Collections.singletonMap(start, new UnlimitedWindow(start));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 63e0a35..b9401b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -48,4 +48,23 @@ public abstract class Window {
     public boolean equalsTo(Window other) {
         return this.start() == other.start() && this.end() == other.end();
     }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this)
+            return true;
+
+        if (!(obj instanceof Window))
+            return false;
+
+        Window other = (Window) obj;
+
+        return this.equalsTo(other) && this.start == other.start && this.end == other.end;
+    }
+
+    @Override
+    public int hashCode() {
+        long n = (this.start << 32) | this.end;
+        return (int) (n % 0xFFFFFFFFL);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 03fb656..10afc73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -35,4 +35,9 @@ public class Windowed<T> {
     public Window window() {
         return window;
     }
+
+    @Override
+    public String toString() {
+        return "[" + value + "@" + window.start() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index ab8d822..e4d7d9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,25 +17,31 @@
 
 package org.apache.kafka.streams.kstream;
 
-import java.util.Collection;
+
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class Windows<W extends Window> {
 
+    private static final int DEFAULT_NUM_SEGMENTS = 3;
+
     private static final long DEFAULT_EMIT_DURATION = 1000L;
 
     private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
 
     private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
 
+    protected String name;
+
     private long emitDuration;
 
     private long maintainDuration;
 
-    protected String name;
+    public int segments;
 
     protected Windows(String name) {
         this.name = name;
+        this.segments = DEFAULT_NUM_SEGMENTS;
         this.emitDuration = DEFAULT_EMIT_DURATION;
         this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
     }
@@ -62,6 +68,19 @@ public abstract class Windows<W extends Window> {
         return this;
     }
 
+    /**
+     * Specifies the number of segments to be used for rolling the window store,
+     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
+     *
+     * @param segments
+     * @return
+     */
+    protected Windows segments(int segments) {
+        this.segments = segments;
+
+        return this;
+    }
+
     public long emitEveryMs() {
         return this.emitDuration;
     }
@@ -74,7 +93,7 @@ public abstract class Windows<W extends Window> {
         return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
     }
 
-    abstract boolean equalTo(Windows other);
+    public abstract boolean equalTo(Windows other);
 
-    abstract Collection<W> windowsFor(long timestamp);
+    public abstract Map<Long, W> windowsFor(long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
new file mode 100644
index 0000000..f02f53a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamAggWindow<K, V> implements ProcessorSupplier<K, V> {
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamAggWindowProcessor();
+    }
+
+    private class KStreamAggWindowProcessor extends AbstractProcessor<K, V> {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            // create a dummy window just for wrapping the timestamp
+            long timestamp = context().timestamp();
+
+            // send the new aggregate value
+            context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
new file mode 100644
index 0000000..5745a03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamAggregate<K, V, T, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, T> {
+
+    private final String storeName;
+    private final Windows<W> windows;
+    private final Aggregator<K, V, T> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<Windowed<K>, Change<V>> get() {
+        return new KStreamAggregateProcessor();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+
+        private WindowStore<K, T> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(Windowed<K> windowedKey, Change<V> change) {
+            // first get the matching windows
+            long timestamp = windowedKey.window().start();
+            K key = windowedKey.value();
+            V value = change.newValue;
+
+            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+            long timeFrom = Long.MAX_VALUE;
+            long timeTo = Long.MIN_VALUE;
+
+            // use range query on window store for efficient reads
+            for (long windowStartMs : matchedWindows.keySet()) {
+                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+            }
+
+            WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+            // for each matching window, try to update the corresponding key and send to the downstream
+            while (iter.hasNext()) {
+                KeyValue<Long, T> entry = iter.next();
+                W window = matchedWindows.get(entry.key);
+
+                if (window != null) {
+
+                    T oldAgg = entry.value;
+
+                    if (oldAgg == null)
+                        oldAgg = aggregator.initialValue();
+
+                    // try to add the new new value (there will never be old value)
+                    T newAgg = aggregator.add(key, value, oldAgg);
+
+                    // update the store with the new value
+                    windowStore.put(key, newAgg, window.start());
+
+                    // forward the aggregated change pair
+                    if (sendOldValues)
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                    else
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+                    matchedWindows.remove(entry.key);
+                }
+            }
+
+            iter.close();
+
+            // create the new window for the rest of unmatched window that do not exist yet
+            for (long windowStartMs : matchedWindows.keySet()) {
+                T oldAgg = aggregator.initialValue();
+                T newAgg = aggregator.add(key, value, oldAgg);
+
+                windowStore.put(key, newAgg, windowStartMs);
+
+                // send the new aggregate pair
+                if (sendOldValues)
+                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
+                else
+                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
+            }
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, T> view() {
+
+        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+
+            public KTableValueGetter<Windowed<K>, T> get() {
+                return new KStreamAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+
+        private WindowStore<K, T> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T get(Windowed<K> windowedKey) {
+            K key = windowedKey.value();
+            W window = (W) windowedKey.window();
+
+            // this iterator should only contain one element
+            Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
+
+            return iter.next().value;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index 175a002..daef8b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
+    private final KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper;
 
-    KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) {
+    KStreamFlatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamFlatMapProcessor();
     }
 
-    private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) {
+        public void process(K key, V value) {
+            for (KeyValue<K1, V1> newPair : mapper.apply(key, value)) {
                 context().forward(newPair.key, newPair.value);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index 9b4559b..97d6b7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -22,24 +22,24 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
+    private final ValueMapper<V, ? extends Iterable<V1>> mapper;
 
-    KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) {
+    KStreamFlatMapValues(ValueMapper<V, ? extends Iterable<V1>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamFlatMapValuesProcessor();
     }
 
-    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            Iterable<V2> newValues = mapper.apply(value);
-            for (V2 v : newValues) {
+        public void process(K key, V value) {
+            Iterable<V1> newValues = mapper.apply(value);
+            for (V1 v : newValues) {
                 context().forward(key, v);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2459f0d..7b634dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -305,28 +305,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         RocksDBWindowStoreSupplier<K, V> thisWindow =
                 new RocksDBWindowStoreSupplier<>(
                         windows.name() + "-this",
-                        windows.before,
-                        windows.after,
                         windows.maintainMs(),
                         windows.segments,
+                        true,
                         new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
                         null);
 
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
                 new RocksDBWindowStoreSupplier<>(
                         windows.name() + "-other",
-                        windows.before,
-                        windows.after,
                         windows.maintainMs(),
                         windows.segments,
+                        true,
                         new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
                         null);
 
-        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+
+        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer);
+        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer);
 
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer);
-        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer);
         KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
 
         String thisWindowStreamName = topology.newName(WINDOWED_NAME);
@@ -362,15 +361,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
                 new RocksDBWindowStoreSupplier<>(
                         windows.name() + "-this",
-                        windows.before,
-                        windows.after,
                         windows.maintainMs(),
                         windows.segments,
+                        true,
                         new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
                         null);
 
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true);
+        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
 
         String otherWindowStreamName = topology.newName(WINDOWED_NAME);
         String joinThisName = topology.newName(LEFTJOIN_NAME);
@@ -401,8 +399,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                        Serializer<T> aggValueSerializer,
                                                                        Deserializer<K> keyDeserializer,
                                                                        Deserializer<T> aggValueDeserializer) {
-        // TODO
-        return null;
+
+        // TODO: this agg window operator is only used for casting K to Windowed<K> for
+        // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
+        String aggregateName = topology.newName(AGGREGATE_NAME);
+        String aggWindowName = topology.newName(WINDOWED_NAME);
+
+        ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
+        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
+
+        RocksDBWindowStoreSupplier<K, T> aggregateStore =
+                new RocksDBWindowStoreSupplier<>(
+                        windows.name(),
+                        windows.maintainMs(),
+                        windows.segments,
+                        false,
+                        new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
+                        null);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(aggWindowName, aggWindowSupplier, this.name);
+        topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName);
+        topology.addStateStore(aggregateStore, aggregateName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index b122aa1..4f427d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,9 +27,14 @@ import org.apache.kafka.streams.state.WindowStore;
 class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
 
     private final String windowName;
+    private final long windowSizeMs;
+    private final long retentionPeriodMs;
 
-    KStreamJoinWindow(String windowName) {
+
+    KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
         this.windowName = windowName;
+        this.windowSizeMs = windowSizeMs;
+        this.retentionPeriodMs = retentionPeriodMs;
     }
 
     @Override
@@ -46,6 +52,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
             super.init(context);
 
             window = (WindowStore<K, V>) context.getStateStore(windowName);
+
+            if (windowSizeMs * 2 > retentionPeriodMs)
+                throw new KafkaException("The retention period must be at least two times the join window size.");
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 8a9bf6c..01e3325 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -29,11 +30,16 @@ import java.util.Iterator;
 class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
     private final String otherWindowName;
+    private final long joinBeforeMs;
+    private final long joinAfterMs;
+
     private final ValueJoiner<V1, V2, R> joiner;
     private final boolean outer;
 
-    KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) {
+    KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<V1, V2, R> joiner, boolean outer) {
         this.otherWindowName = otherWindowName;
+        this.joinBeforeMs = joinBeforeMs;
+        this.joinAfterMs = joinAfterMs;
         this.joiner = joiner;
         this.outer = outer;
     }
@@ -59,10 +65,13 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
         public void process(K key, V1 value) {
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 
-            Iterator<V2> iter = otherWindow.fetch(key, context().timestamp());
+            long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
+            long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+
+            Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
             while (iter.hasNext()) {
                 needOuterJoin = false;
-                context().forward(key, joiner.apply(value, iter.next()));
+                context().forward(key, joiner.apply(value, iter.next().value));
             }
 
             if (needOuterJoin)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index 51a6277..dfca019 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -23,12 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamKTableLeftJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
+class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
     private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
-    private final ValueJoiner<V1, V2, V> joiner;
+    private final ValueJoiner<V1, V2, R> joiner;
 
-    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, V> joiner) {
+    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
         this.valueGetterSupplier = table.valueGetterSupplier();
         this.joiner = joiner;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 3868318..57f1431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+    private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
 
-    public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+    public KStreamMap(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamMapProcessor();
     }
 
-    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            KeyValue<K2, V2> newPair = mapper.apply(key, value);
+        public void process(K key, V value) {
+            KeyValue<K1, V1> newPair = mapper.apply(key, value);
             context().forward(newPair.key, newPair.value);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 692b421..06667e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -22,23 +22,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<V1, V2> mapper;
+    private final ValueMapper<V, V1> mapper;
 
-    public KStreamMapValues(ValueMapper<V1, V2> mapper) {
+    public KStreamMapValues(ValueMapper<V, V1> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamMapProcessor();
     }
 
-    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            V2 newValue = mapper.apply(value);
+        public void process(K key, V value) {
+            V1 newValue = mapper.apply(value);
             context().forward(key, newValue);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 7ebab0e..a9d8f97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
+    private final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier;
 
-    public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
+    public KStreamTransform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
         this.transformerSupplier = transformerSupplier;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamTransformProcessor(transformerSupplier.get());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index ad987dd..5e441aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -19,19 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
 
-abstract class KTableKTableAbstractJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K, V1, R> {
 
     protected final KTableImpl<K, ?, V1> table1;
     protected final KTableImpl<K, ?, V2> table2;
     protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
     protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    protected final ValueJoiner<V1, V2, V> joiner;
+    protected final ValueJoiner<V1, V2, R> joiner;
 
     protected boolean sendOldValues = false;
 
     KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
                              KTableImpl<K, ?, V2> table2,
-                             ValueJoiner<V1, V2, V> joiner) {
+                             ValueJoiner<V1, V2, R> joiner) {
         this.table1 = table1;
         this.table2 = table2;
         this.valueGetterSupplier1 = table1.valueGetterSupplier();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 9716edd..6eb27b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -34,10 +34,10 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -61,8 +61,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = null;
 
             if (change.newValue != null || change.oldValue != null)
@@ -78,7 +78,7 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
         }
     }
 
-    private class KTableKTableJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -95,8 +95,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
         }
 
         @Override
-        public V get(K key) {
-            V newValue = null;
+        public R get(K key) {
+            R newValue = null;
             V1 value1 = valueGetter1.get(key);
 
             if (value1 != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index b10bdb5..00e872e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -34,10 +34,10 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -61,8 +61,8 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = null;
 
             if (change.newValue != null || change.oldValue != null)
@@ -79,7 +79,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
     }
 
-    private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -96,7 +96,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
 
         @Override
-        public V get(K key) {
+        public R get(K key) {
             V1 value1 = valueGetter1.get(key);
 
             if (value1 != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index b859b34..6ab0ae9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -34,10 +34,10 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -61,8 +61,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = valueGetter.get(key);
 
             if (change.newValue != null || value2 != null)
@@ -77,7 +77,7 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
     }
 
-    private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -94,8 +94,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
 
         @Override
-        public V get(K key) {
-            V newValue = null;
+        public R get(K key) {
+            R newValue = null;
             V1 value1 = valueGetter1.get(key);
             V2 value2 = valueGetter2.get(key);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index f20e987..a6a13fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -22,10 +22,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
 
-    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -35,10 +35,10 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -62,8 +62,8 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = valueGetter.get(key);
 
             if (value2 != null) {
@@ -77,7 +77,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
     }
 
-    private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
 
         @Override
-        public V get(K key) {
+        public R get(K key) {
             V2 value2 = valueGetter2.get(key);
 
             if (value2 != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c664906..244d8ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -23,30 +23,30 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 
-class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
+class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
-    private final KTableImpl<K1, ?, V1> parent;
-    private final ValueMapper<V1, V2> mapper;
+    private final KTableImpl<K, ?, V> parent;
+    private final ValueMapper<V, V1> mapper;
 
     private boolean sendOldValues = false;
 
-    public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) {
+    public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<V, V1> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, Change<V1>> get() {
+    public Processor<K, Change<V>> get() {
         return new KTableMapValuesProcessor();
     }
 
     @Override
-    public KTableValueGetterSupplier<K1, V2> view() {
-        final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+    public KTableValueGetterSupplier<K, V1> view() {
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
 
-        return new KTableValueGetterSupplier<K1, V2>() {
+        return new KTableValueGetterSupplier<K, V1>() {
 
-            public KTableValueGetter<K1, V2> get() {
+            public KTableValueGetter<K, V1> get() {
                 return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
             }
 
@@ -59,8 +59,8 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         sendOldValues = true;
     }
 
-    private V2 computeValue(V1 value) {
-        V2 newValue = null;
+    private V1 computeValue(V value) {
+        V1 newValue = null;
 
         if (value != null)
             newValue = mapper.apply(value);
@@ -68,22 +68,22 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         return newValue;
     }
 
-    private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> {
+    private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
 
         @Override
-        public void process(K1 key, Change<V1> change) {
-            V2 newValue = computeValue(change.newValue);
-            V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
+        public void process(K key, Change<V> change) {
+            V1 newValue = computeValue(change.newValue);
+            V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
 
             context().forward(key, new Change<>(newValue, oldValue));
         }
     }
 
-    private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+    private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
 
-        private final KTableValueGetter<K1, V1> parentGetter;
+        private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+        public KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
@@ -93,7 +93,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         }
 
         @Override
-        public V2 get(K1 key) {
+        public V1 get(K key) {
             return computeValue(parentGetter.get(key));
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index bbef7fb..12fcc17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -29,28 +29,28 @@ import org.apache.kafka.streams.processor.ProcessorContext;
  *
  * Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
  */
-public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> {
+public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> {
 
-    private final KTableImpl<K1, ?, V1> parent;
-    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+    private final KTableImpl<K, ?, V> parent;
+    private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
 
-    public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+    public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, Change<V1>> get() {
+    public Processor<K, Change<V>> get() {
         return new KTableMapProcessor();
     }
 
     @Override
-    public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() {
-        final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+    public KTableValueGetterSupplier<K, KeyValue<K1, V1>> view() {
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
 
-        return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() {
+        return new KTableValueGetterSupplier<K, KeyValue<K1, V1>>() {
 
-            public KTableValueGetter<K1, KeyValue<K2, V2>> get() {
+            public KTableValueGetter<K, KeyValue<K1, V1>> get() {
                 return new KTableMapValueGetter(parentValueGetterSupplier.get());
             }
 
@@ -63,8 +63,8 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
         throw new KafkaException("KTableRepartitionMap should always require sending old values.");
     }
 
-    private KeyValue<K2, V2> computeValue(K1 key, V1 value) {
-        KeyValue<K2, V2> newValue = null;
+    private KeyValue<K1, V1> computeValue(K key, V value) {
+        KeyValue<K1, V1> newValue = null;
 
         if (key != null || value != null)
             newValue = mapper.apply(key, value);
@@ -72,26 +72,26 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
         return newValue;
     }
 
-    private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+    private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
 
         @Override
-        public void process(K1 key, Change<V1> change) {
-            KeyValue<K2, V2> newPair = computeValue(key, change.newValue);
+        public void process(K key, Change<V> change) {
+            KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
 
             context().forward(newPair.key, new Change<>(newPair.value, null));
 
             if (change.oldValue != null) {
-                KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue);
+                KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue);
                 context().forward(oldPair.key, new Change<>(null, oldPair.value));
             }
         }
     }
 
-    private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> {
+    private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
 
-        private final KTableValueGetter<K1, V1> parentGetter;
+        private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+        public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
@@ -101,7 +101,7 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
         }
 
         @Override
-        public KeyValue<K2, V2> get(K1 key) {
+        public KeyValue<K1, V1> get(K key) {
             return computeValue(key, parentGetter.get(key));
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
deleted file mode 100644
index a6b5149..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class SlidingWindow extends Window {
-
-    public SlidingWindow(long start, long end) {
-        super(start, end);
-    }
-
-    @Override
-    public boolean overlap(Window other) {
-        return super.overlap(other) && other.getClass().equals(SlidingWindow.class);
-    }
-
-    @Override
-    public boolean equalsTo(Window other) {
-        return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
new file mode 100644
index 0000000..a02d4b9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class TumblingWindow extends Window {
+
+    public TumblingWindow(long start, long end) {
+        super(start, end);
+    }
+
+    @Override
+    public boolean overlap(Window other) {
+        return super.overlap(other) && other.getClass().equals(TumblingWindow.class);
+    }
+
+    @Override
+    public boolean equalsTo(Window other) {
+        return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
index d4ed0e7..cfcfb00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 
@@ -97,20 +98,25 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(K key, long timestamp) {
-        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timestamp), this.rangeTime);
+    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
     }
 
     @Override
     public void put(K key, V value) {
-        putAndReturnInternalKey(key, value);
+        putAndReturnInternalKey(key, value, -1L);
     }
 
     @Override
-    public byte[] putAndReturnInternalKey(K key, V value) {
+    public void put(K key, V value, long timestamp) {
+        putAndReturnInternalKey(key, value, timestamp);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
         long startNs = time.nanoseconds();
         try {
-            byte[] binKey = this.inner.putAndReturnInternalKey(key, value);
+            byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
 
             if (loggingEnabled) {
                 changeLogger.add(binKey);
@@ -174,7 +180,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         @Override
-        public E next() {
+        public KeyValue<Long, E> next() {
             return iter.next();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index a32faf4..62b9f2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -222,6 +222,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         @Override
         public void close() {
+            iter.dispose();
         }
 
     }