You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/19 00:02:56 UTC

[2/3] kafka git commit: KAFKA-5192: add WindowStore range scan (KIP-155)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 3036f79..77b92a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -38,9 +38,12 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 class Segments {
     private static final Logger log = LoggerFactory.getLogger(Segments.class);
-
     static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
 
+    static long segmentInterval(long retentionPeriod, int numSegments) {
+        return Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+    }
+
     private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
     private final String name;
     private final int numSegments;
@@ -52,7 +55,7 @@ class Segments {
     Segments(final String name, final long retentionPeriod, final int numSegments) {
         this.name = name;
         this.numSegments = numSegments;
-        this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+        this.segmentInterval = segmentInterval(retentionPeriod, numSegments);
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
         this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 80785b2..6d6d9bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -23,10 +23,15 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 
 class SessionKeySchema implements SegmentedBytesStore.KeySchema {
+
+    private static final int SUFFIX_SIZE = 2 * WindowStoreUtils.TIMESTAMP_SIZE;
+    private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
+
     private String topic;
 
     @Override
@@ -35,33 +40,49 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
     }
 
     @Override
-    public Bytes upperRange(final Bytes key, final long to) {
+    public Bytes upperRangeFixedSize(final Bytes key, final long to) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
         return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
     }
 
     @Override
-    public Bytes lowerRange(final Bytes key, final long from) {
+    public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
         final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
         return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
     }
 
     @Override
+    public Bytes upperRange(Bytes key, long to) {
+        final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)
+            .putLong(to)
+            // start can at most be equal to end
+            .putLong(to)
+            .array();
+        return OrderedBytes.upperRange(key, maxSuffix);
+    }
+
+    @Override
+    public Bytes lowerRange(Bytes key, long from) {
+        return OrderedBytes.lowerRange(key, MIN_SUFFIX);
+    }
+
+    @Override
     public long segmentTimestamp(final Bytes key) {
         return SessionKeySerde.extractEnd(key.get());
     }
 
     @Override
-    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
         return new HasNextCondition() {
             @Override
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
                 while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
                     final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
-                    if (windowedKey.key().equals(binaryKey)
-                            && windowedKey.window().end() >= from
-                            && windowedKey.window().start() <= to) {
+                    if (windowedKey.key().compareTo(binaryKeyFrom) >= 0
+                        && windowedKey.key().compareTo(binaryKeyTo) <= 0
+                        && windowedKey.window().end() >= from
+                        && windowedKey.window().start() <= to) {
                         return true;
                     }
                     iterator.next();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index b9a8665..214f36b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -21,9 +21,14 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+
+    private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE;
+    private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
+
     private StateSerdes<Bytes, byte[]> serdes;
 
     @Override
@@ -33,21 +38,36 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
     @Override
     public Bytes upperRange(final Bytes key, final long to) {
-        return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+        final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)
+            .putLong(to)
+            .putInt(Integer.MAX_VALUE)
+            .array();
+
+        return OrderedBytes.upperRange(key, maxSuffix);
     }
 
     @Override
     public Bytes lowerRange(final Bytes key, final long from) {
+        return OrderedBytes.lowerRange(key, MIN_SUFFIX);
+    }
+
+    @Override
+    public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
         return WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes);
     }
 
     @Override
+    public Bytes upperRangeFixedSize(final Bytes key, final long to) {
+        return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+    }
+
+    @Override
     public long segmentTimestamp(final Bytes key) {
         return WindowStoreUtils.timestampFromBinaryKey(key.get());
     }
 
     @Override
-    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
         return new HasNextCondition() {
             @Override
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
@@ -55,9 +75,10 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
                     final Bytes bytes = iterator.peekNextKey();
                     final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
                     final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
-                    if (keyBytes.equals(binaryKey)
-                            && time >= from
-                            && time <= to) {
+                    if (keyBytes.compareTo(binaryKeyFrom) >= 0
+                        && keyBytes.compareTo(binaryKeyTo) <= 0
+                        && time >= from
+                        && time <= to) {
                         return true;
                     }
                     iterator.next();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
new file mode 100644
index 0000000..4fd6f3e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.NoSuchElementException;
+
+class WindowStoreIteratorWrapper<K, V> {
+
+    // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
+    private static class WrappedWindowStoreBytesIterator extends WindowStoreIteratorWrapper<Bytes, byte[]> {
+        WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+                                        final StateSerdes<Bytes, byte[]> serdes,
+                                        final long windowSize) {
+            super(underlying, serdes, windowSize);
+        }
+
+        @Override
+        public WindowStoreIterator<byte[]> valuesIterator() {
+            return new WrappedWindowStoreIterator<byte[]>(bytesIterator, serdes) {
+                @Override
+                public KeyValue<Long, byte[]> next() {
+                    final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+                    return KeyValue.pair(timestamp, next.value);
+                }
+            };
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() {
+            return new WrappedKeyValueIterator<Bytes, byte[]>(bytesIterator, serdes, windowSize) {
+                @Override
+                public Windowed<Bytes> peekNextKey() {
+                    final Bytes next = bytesIterator.peekNextKey();
+                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get());
+                    final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get());
+                    return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+                }
+
+                @Override
+                public KeyValue<Windowed<Bytes>, byte[]> next() {
+                    if (!bytesIterator.hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+                    final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+                    final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.key.get());
+                    return KeyValue.pair(
+                        new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)),
+                        next.value
+                    );
+                }
+            };
+        }
+    }
+
+    static WindowStoreIteratorWrapper<Bytes, byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+                                                                   final StateSerdes<Bytes, byte[]> serdes,
+                                                                   final long windowSize) {
+        return new WrappedWindowStoreBytesIterator(underlying, serdes, windowSize);
+    }
+
+
+    protected final KeyValueIterator<Bytes, byte[]> bytesIterator;
+    protected final StateSerdes<K, V> serdes;
+    protected final long windowSize;
+
+    WindowStoreIteratorWrapper(
+        final KeyValueIterator<Bytes, byte[]> bytesIterator,
+        final StateSerdes<K, V> serdes,
+        final long windowSize
+    ) {
+        this.bytesIterator = bytesIterator;
+        this.serdes = serdes;
+        this.windowSize = windowSize;
+    }
+
+    public WindowStoreIterator<V> valuesIterator() {
+        return new WrappedWindowStoreIterator<>(bytesIterator, serdes);
+    }
+
+    public KeyValueIterator<Windowed<K>, V> keyValueIterator() {
+        return new WrappedKeyValueIterator<>(bytesIterator, serdes, windowSize);
+    }
+
+    private static class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator;
+        final StateSerdes<?, V> serdes;
+
+        WrappedWindowStoreIterator(
+            KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<?, V> serdes) {
+            this.bytesIterator = bytesIterator;
+            this.serdes = serdes;
+        }
+
+        @Override
+        public Long peekNextKey() {
+            return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get());
+        }
+
+        @Override
+        public boolean hasNext() {
+            return bytesIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, V> next() {
+            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+            final V value = serdes.valueFrom(next.value);
+            return KeyValue.pair(timestamp, value);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
+        }
+
+        @Override
+        public void close() {
+            bytesIterator.close();
+        }
+    }
+
+    private static class WrappedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator;
+        final StateSerdes<K, V> serdes;
+        final long windowSize;
+
+        WrappedKeyValueIterator(
+            KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<K, V> serdes, long windowSize) {
+            this.bytesIterator = bytesIterator;
+            this.serdes = serdes;
+            this.windowSize = windowSize;
+        }
+
+        @Override
+        public Windowed<K> peekNextKey() {
+            final byte[] nextKey = bytesIterator.peekNextKey().get();
+            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(nextKey);
+            final K key = WindowStoreUtils.keyFromBinaryKey(nextKey, serdes);
+            return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+        }
+
+        @Override
+        public boolean hasNext() {
+            return bytesIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Windowed<K>, V> next() {
+            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+            final K key = WindowStoreUtils.keyFromBinaryKey(next.key.get(), serdes);
+            final V value = serdes.valueFrom(next.value);
+            return KeyValue.pair(
+                new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)),
+                value
+            );
+
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
+        }
+
+        @Override
+        public void close() {
+            bytesIterator.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index faf2899..ed79947 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -19,14 +19,15 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import java.nio.ByteBuffer;
 
 public class WindowStoreUtils {
 
-    private static final int SEQNUM_SIZE = 4;
-    private static final int TIMESTAMP_SIZE = 8;
+    static final int SEQNUM_SIZE = 4;
+    static final int TIMESTAMP_SIZE = 8;
 
     /** Inner byte array serde used for segments */
     static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
@@ -73,4 +74,13 @@ public class WindowStoreUtils {
     static int sequenceNumberFromBinaryKey(byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
     }
+
+    /**
+     * Safely construct a time window of the given size,
+     * taking care of bounding endMs to Long.MAX_VALUE if necessary
+     */
+    static TimeWindow timeWindowForSize(final long startMs, final long windowSize) {
+        final long endMs = startMs + windowSize;
+        return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
deleted file mode 100644
index 1ce6b04..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.util.NoSuchElementException;
-
-class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> {
-    final KeyValueIterator<Bytes, byte[]> bytesIterator;
-    private final StateSerdes<?, V> serdes;
-
-    // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
-    private static class WrappedWindowStoreBytesIterator extends WrappedWindowStoreIterator<byte[]> {
-        WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
-                                        final StateSerdes<Bytes, byte[]> serdes) {
-            super(underlying, serdes);
-        }
-
-        @Override
-        public KeyValue<Long, byte[]> next() {
-            if (!bytesIterator.hasNext()) {
-                throw new NoSuchElementException();
-            }
-
-            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
-            final byte[] value = next.value;
-            return KeyValue.pair(timestamp, value);
-        }
-    }
-
-    static WrappedWindowStoreIterator<byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
-                                                            final StateSerdes<Bytes, byte[]> serdes) {
-        return new WrappedWindowStoreBytesIterator(underlying, serdes);
-    }
-
-    WrappedWindowStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<?, V> serdes) {
-        this.bytesIterator = bytesIterator;
-        this.serdes = serdes;
-    }
-
-    @Override
-    public boolean hasNext() {
-        return bytesIterator.hasNext();
-    }
-
-    /**
-     * @throws NoSuchElementException if no next element exists
-     */
-    @Override
-    public KeyValue<Long, V> next() {
-        final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-        final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
-        final V value = serdes.valueFrom(next.value);
-        return KeyValue.pair(timestamp, value);
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
-    }
-
-    @Override
-    public void close() {
-        bytesIterator.close();
-    }
-
-    @Override
-    public Long peekNextKey() {
-        return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 24e0329..97a1408 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -131,23 +131,23 @@ public class KStreamWindowAggregateTest {
 
 
         assertEquals(Utils.mkList(
-                "[A@0]:0+1",
-                "[B@0]:0+2",
-                "[C@0]:0+3",
-                "[D@0]:0+4",
-                "[A@0]:0+1+1",
-
-                "[A@0]:0+1+1+1", "[A@5]:0+1",
-                "[B@0]:0+2+2", "[B@5]:0+2",
-                "[D@0]:0+4+4", "[D@5]:0+4",
-                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                "[C@0]:0+3+3", "[C@5]:0+3",
-
-                "[A@5]:0+1+1", "[A@10]:0+1",
-                "[B@5]:0+2+2+2", "[B@10]:0+2",
-                "[D@5]:0+4+4", "[D@10]:0+4",
-                "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
-                "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+                "[A@0/10]:0+1",
+                "[B@0/10]:0+2",
+                "[C@0/10]:0+3",
+                "[D@0/10]:0+4",
+                "[A@0/10]:0+1+1",
+
+                "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+                "[B@0/10]:0+2+2",   "[B@5/15]:0+2",
+                "[D@0/10]:0+4+4",   "[D@5/15]:0+4",
+                "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+                "[C@0/10]:0+3+3",   "[C@5/15]:0+3",
+
+                "[A@5/15]:0+1+1",     "[A@10/20]:0+1",
+                "[B@5/15]:0+2+2+2",   "[B@10/20]:0+2",
+                "[D@5/15]:0+4+4",     "[D@10/20]:0+4",
+                "[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2",
+                "[C@5/15]:0+3+3",     "[C@10/20]:0+3"), proc2.processed);
     }
 
     private void setRecordContext(final long time, final String topic) {
@@ -210,11 +210,11 @@ public class KStreamWindowAggregateTest {
         driver.flushState();
 
         proc1.checkAndClearProcessResult(
-                "[A@0]:0+1",
-                "[B@0]:0+2",
-                "[C@0]:0+3",
-                "[D@0]:0+4",
-                "[A@0]:0+1+1"
+                "[A@0/10]:0+1",
+                "[B@0/10]:0+2",
+                "[C@0/10]:0+3",
+                "[D@0/10]:0+4",
+                "[A@0/10]:0+1+1"
         );
         proc2.checkAndClearProcessResult();
         proc3.checkAndClearProcessResult();
@@ -236,11 +236,11 @@ public class KStreamWindowAggregateTest {
         driver.flushState();
 
         proc1.checkAndClearProcessResult(
-                "[A@0]:0+1+1+1", "[A@5]:0+1",
-                "[B@0]:0+2+2", "[B@5]:0+2",
-                "[D@0]:0+4+4", "[D@5]:0+4",
-                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                "[C@0]:0+3+3", "[C@5]:0+3"
+                "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+                "[B@0/10]:0+2+2",   "[B@5/15]:0+2",
+                "[D@0/10]:0+4+4",   "[D@5/15]:0+4",
+                "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+                "[C@0/10]:0+3+3",   "[C@5/15]:0+3"
         );
         proc2.checkAndClearProcessResult();
         proc3.checkAndClearProcessResult();
@@ -263,18 +263,18 @@ public class KStreamWindowAggregateTest {
 
         proc1.checkAndClearProcessResult();
         proc2.checkAndClearProcessResult(
-                "[A@0]:0+a",
-                "[B@0]:0+b",
-                "[C@0]:0+c",
-                "[D@0]:0+d",
-                "[A@0]:0+a+a"
+                "[A@0/10]:0+a",
+                "[B@0/10]:0+b",
+                "[C@0/10]:0+c",
+                "[D@0/10]:0+d",
+                "[A@0/10]:0+a+a"
         );
         proc3.checkAndClearProcessResult(
-                "[A@0]:0+1+1+1%0+a",
-                "[B@0]:0+2+2+2%0+b",
-                "[C@0]:0+3+3%0+c",
-                "[D@0]:0+4+4%0+d",
-                "[A@0]:0+1+1+1%0+a+a");
+                "[A@0/10]:0+1+1+1%0+a",
+                "[B@0/10]:0+2+2+2%0+b",
+                "[C@0/10]:0+3+3%0+c",
+                "[D@0/10]:0+4+4%0+d",
+                "[A@0/10]:0+1+1+1%0+a+a");
 
         setRecordContext(5, topic1);
         driver.process(topic2, "A", "a");
@@ -293,18 +293,18 @@ public class KStreamWindowAggregateTest {
         driver.flushState();
         proc1.checkAndClearProcessResult();
         proc2.checkAndClearProcessResult(
-                "[A@0]:0+a+a+a", "[A@5]:0+a",
-                "[B@0]:0+b+b", "[B@5]:0+b",
-                "[D@0]:0+d+d", "[D@5]:0+d",
-                "[B@0]:0+b+b+b", "[B@5]:0+b+b",
-                "[C@0]:0+c+c", "[C@5]:0+c"
+                "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+                "[B@0/10]:0+b+b",   "[B@5/15]:0+b",
+                "[D@0/10]:0+d+d",   "[D@5/15]:0+d",
+                "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+                "[C@0/10]:0+c+c",   "[C@5/15]:0+c"
         );
         proc3.checkAndClearProcessResult(
-                "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
-                "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
-                "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
-                "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
-                "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+                "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+                "[B@0/10]:0+2+2+2%0+b+b",   "[B@5/15]:0+2+2%0+b",
+                "[D@0/10]:0+4+4%0+d+d",     "[D@5/15]:0+4%0+d",
+                "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+                "[C@0/10]:0+3+3%0+c+c",     "[C@5/15]:0+3%0+c"
         );
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index e7bd187..3ad6475 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 
@@ -55,4 +56,9 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
     public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
         return null;
     }
+
+    @Override
+    public WindowStoreIterator<KeyValue> fetch(Object from, Object to, long timeFrom, long timeTo) {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index f8eec1c..bfc20ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -63,11 +63,15 @@ public class CachingSessionStoreTest {
     public void setUp() throws Exception {
         final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
-        underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, schema);
+        final int retention = 60000;
+        final int numSegments = 3;
+        underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
         cachingStore = new CachingSessionStore<>(sessionStore,
                                                  Serdes.String(),
-                                                 Serdes.Long());
+                                                 Serdes.Long(),
+                                                 Segments.segmentInterval(retention, numSegments)
+                                                 );
         cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
@@ -86,6 +90,8 @@ public class CachingSessionStoreTest {
         cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
         cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
 
+        assertEquals(3, cache.size());
+
         final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0);
         final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0);
 
@@ -93,7 +99,35 @@ public class CachingSessionStoreTest {
         assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next());
         assertFalse(a.hasNext());
         assertFalse(b.hasNext());
+    }
+
+    @Test
+    public void shouldPutFetchAllKeysFromCache() throws Exception {
+        cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
+
+        assertEquals(3, cache.size());
+
+        final KeyValueIterator<Windowed<String>, Long> all = cachingStore.findSessions("a", "b", 0, 0);
+        assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), all.next());
+        assertEquals(KeyValue.pair(new Windowed<>("aa", new SessionWindow(0, 0)), 1L), all.next());
+        assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), all.next());
+        assertFalse(all.hasNext());
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCache() throws Exception {
+        cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
+
         assertEquals(3, cache.size());
+
+        final KeyValueIterator<Windowed<String>, Long> some = cachingStore.findSessions("aa", "b", 0, 0);
+        assertEquals(KeyValue.pair(new Windowed<>("aa", new SessionWindow(0, 0)), 1L), some.next());
+        assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L),  some.next());
+        assertFalse(some.hasNext());
     }
 
     @Test
@@ -164,6 +198,29 @@ public class CachingSessionStoreTest {
     }
 
     @Test
+    public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception {
+        final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
+        final Windowed<String> aa1 = new Windowed<>("aa", new SessionWindow(0, 0));
+        final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
+        final Windowed<String> a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+        final Windowed<String> aa3 = new Windowed<>("aa", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+        cachingStore.put(a1, 1L);
+        cachingStore.put(aa1, 1L);
+        cachingStore.put(a2, 2L);
+        cachingStore.put(a3, 3L);
+        cachingStore.put(aa3, 3L);
+        cachingStore.flush();
+
+        final KeyValueIterator<Windowed<String>, Long> rangeResults = cachingStore.findSessions("a", "aa", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
+        assertEquals(a1, rangeResults.next().key);
+        assertEquals(aa1, rangeResults.next().key);
+        assertEquals(a2, rangeResults.next().key);
+        assertEquals(a3, rangeResults.next().key);
+        assertEquals(aa3, rangeResults.next().key);
+        assertFalse(rangeResults.hasNext());
+    }
+
+    @Test
     public void shouldForwardChangedValuesDuringFlush() throws Exception {
         final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0));
         final List<KeyValue<Windowed<String>, Change<Long>>> flushed = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 5b3123e..faf6e83 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -64,13 +64,16 @@ public class CachingWindowStoreTest {
     @Before
     public void setUp() throws Exception {
         keySchema = new WindowKeySchema();
-        underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
-        final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
+        final int retention = 30000;
+        final int numSegments = 3;
+        underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, keySchema);
+        final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(windowStore,
                                                 Serdes.String(),
                                                 Serdes.String(),
-                                                WINDOW_SIZE);
+                                                WINDOW_SIZE,
+                                                Segments.segmentInterval(retention, numSegments));
         cachingStore.setFlushListener(cacheListener);
         cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
@@ -100,6 +103,19 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    public void shouldPutFetchRangeFromCache() throws Exception {
+        cachingStore.put("a", "a");
+        cachingStore.put("b", "b");
+
+        final KeyValueIterator<Windowed<String>, String> iterator = cachingStore.fetch("a", "b", 10, 10);
+        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a"), iterator.next());
+        assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "b"), iterator.next());
+        assertFalse(iterator.hasNext());
+        assertEquals(2, cache.size());
+    }
+
+
+    @Test
     public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
         int added = addItemsToCache();
         // all dirty entries should have been flushed
@@ -171,6 +187,19 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    public void shouldIterateCacheAndStoreKeyRange() throws Exception {
+        final Bytes key = Bytes.wrap("1" .getBytes());
+        underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
+        cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
+
+        final KeyValueIterator<Windowed<String>, String> fetchRange =
+            cachingStore.fetch("1", "2", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
+        assertEquals(KeyValue.pair(new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a"), fetchRange.next());
+        assertEquals(KeyValue.pair(new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)), "b"), fetchRange.next());
+        assertFalse(fetchRange.hasNext());
+    }
+
+    @Test
     public void shouldClearNamespaceCacheOnClose() throws Exception {
         cachingStore.put("a", "a");
         assertEquals(1, cache.size());
@@ -185,12 +214,17 @@ public class CachingWindowStoreTest {
     }
 
     @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.fetch("a", "b", 0, 10);
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
         cachingStore.close();
         cachingStore.put("a", "a");
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldFetchAndIterateOverExactKeys() throws Exception {
         cachingStore.put("a", "0001", 0);
@@ -203,6 +237,34 @@ public class CachingWindowStoreTest {
         assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
     }
 
+    @Test
+    public void shouldFetchAndIterateOverKeyRange() throws Exception {
+        cachingStore.put("a", "0001", 0);
+        cachingStore.put("aa", "0002", 0);
+        cachingStore.put("a", "0003", 1);
+        cachingStore.put("aa", "0004", 1);
+        cachingStore.put("a", "0005", 60000);
+
+        assertThat(
+            toList(cachingStore.fetch("a", "a", 0, Long.MAX_VALUE)),
+            equalTo(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)))
+        );
+
+        assertThat(
+            toList(cachingStore.fetch("aa", "aa", 0, Long.MAX_VALUE)),
+            equalTo(Utils.mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)))
+        );
+
+        assertThat(
+            toList(cachingStore.fetch("a", "aa", 0, Long.MAX_VALUE)),
+            equalTo(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L)))
+        );
+    }
+
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
+        return KeyValue.pair(new Windowed<>(key, new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), value);
+    }
+
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;
         int i = 0;
@@ -216,4 +278,4 @@ public class CachingWindowStoreTest {
         return i;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 6f4ff07..b6e95a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -38,6 +38,7 @@ import static org.junit.Assert.assertEquals;
 
 public class CompositeReadOnlyWindowStoreTest {
 
+    private static final long WINDOW_SIZE = 30_000;
     private final String storeName = "window-store";
     private StateStoreProviderStub stubProviderOne;
     private StateStoreProviderStub stubProviderTwo;
@@ -54,10 +55,10 @@ public class CompositeReadOnlyWindowStoreTest {
     public void before() {
         stubProviderOne = new StateStoreProviderStub(false);
         stubProviderTwo = new StateStoreProviderStub(false);
-        underlyingWindowStore = new ReadOnlyWindowStoreStub<>();
+        underlyingWindowStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
         stubProviderOne.addStore(storeName, underlyingWindowStore);
 
-        otherUnderlyingStore = new ReadOnlyWindowStoreStub<>();
+        otherUnderlyingStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
         stubProviderOne.addStore("other-window-store", otherUnderlyingStore);
 
 
@@ -89,7 +90,7 @@ public class CompositeReadOnlyWindowStoreTest {
     @Test
     public void shouldFindValueForKeyWhenMultiStores() throws Exception {
         final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
-            ReadOnlyWindowStoreStub<>();
+            ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
         stubProviderTwo.addStore(storeName, secondUnderlying);
 
         underlyingWindowStore.put("key-one", "value-one", 0L);
@@ -162,4 +163,4 @@ public class CompositeReadOnlyWindowStoreTest {
         windowStoreIterator.next();
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
index acded8c..6cc77df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -34,6 +34,18 @@ import static org.junit.Assert.assertTrue;
 
 public class FilteredCacheIteratorTest {
 
+    private static final CacheFunction IDENTITY_FUNCTION = new CacheFunction() {
+        @Override
+        public Bytes key(Bytes cacheKey) {
+            return cacheKey;
+        }
+
+        @Override
+        public Bytes cacheKey(Bytes key) {
+            return key;
+        }
+    };
+
     @SuppressWarnings("unchecked")
     private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null);
     private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
@@ -58,8 +70,8 @@ public class FilteredCacheIteratorTest {
             }
         };
         allIterator = new FilteredCacheIterator(
-                new DelegatingPeekingKeyValueIterator<>("",
-                                                        store.all()), allCondition);
+            new DelegatingPeekingKeyValueIterator<>("",
+                                                    store.all()), allCondition, IDENTITY_FUNCTION);
 
         final HasNextCondition firstEntryCondition = new HasNextCondition() {
             @Override
@@ -69,7 +81,7 @@ public class FilteredCacheIteratorTest {
         };
         firstEntryIterator = new FilteredCacheIterator(
                 new DelegatingPeekingKeyValueIterator<>("",
-                                                        store.all()), firstEntryCondition);
+                                                        store.all()), firstEntryCondition, IDENTITY_FUNCTION);
 
     }
 
@@ -115,4 +127,4 @@ public class FilteredCacheIteratorTest {
         allIterator.remove();
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
index d3d8f40..ee5e529 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
@@ -36,6 +36,13 @@ import static org.junit.Assert.assertTrue;
 
 public class MergedSortedCacheWrappedSessionStoreIteratorTest {
 
+    private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) {
+        @Override
+        public long segmentId(Bytes key) {
+            return 0;
+        }
+    };
+
     private final String storeKey = "a";
     private final String cacheKey = "b";
 
@@ -43,10 +50,13 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
     private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton(
             KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator();
     private final SessionWindow cacheWindow = new SessionWindow(10, 20);
-    private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair(
-            SessionKeySerde.toBinary(
-                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy"), new LRUCacheEntry(cacheKey.getBytes())))
-            .iterator();
+    private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
+        KeyValue.pair(
+            SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(
+                SessionKeySerde.toBinary(new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy")
+            ),
+            new LRUCacheEntry(cacheKey.getBytes())
+        )).iterator();
 
     @Test
     public void shouldHaveNextFromStore() throws Exception {
@@ -106,7 +116,10 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
                 = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
-        return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()));
+        return new MergedSortedCacheSessionStoreIterator<>(
+            cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()),
+            SINGLE_SEGMENT_CACHE_FUNCTION
+        );
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index 2048688..fed39b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -36,6 +36,13 @@ import static org.junit.Assert.assertEquals;
 
 public class MergedSortedCacheWrappedWindowStoreIteratorTest {
 
+    private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) {
+        @Override
+        public long segmentId(Bytes key) {
+            return 0;
+        }
+    };
+
     private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>();
     private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  new MockStreamsMetrics(new Metrics()));
     private final String namespace = "one";
@@ -52,16 +59,20 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
             final Bytes keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes);
             final byte[] valBytes = String.valueOf(t + 10).getBytes();
             expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
-            cache.put(namespace, keyBytes, new LRUCacheEntry(valBytes));
+            cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
         }
 
         Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
         Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
 
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(
+            namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
+        );
 
-        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(
+            cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())
+        );
         int index = 0;
         while (iterator.hasNext()) {
             final KeyValue<Long, byte[]> next = iterator.next();
@@ -74,12 +85,16 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
     @Test
     public void shouldPeekNextStoreKey() throws Exception {
         windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
-        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+        cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
         Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
         Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
-        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(
+            namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
+        );
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(
+            cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())
+        );
         assertThat(iterator.peekNextKey(), equalTo(0L));
         iterator.next();
         assertThat(iterator.peekNextKey(), equalTo(10L));
@@ -88,15 +103,14 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
     @Test
     public void shouldPeekNextCacheKey() throws Exception {
         windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
-        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+        cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
         Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
         Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
         final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes));
         final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
         assertThat(iterator.peekNextKey(), equalTo(0L));
         iterator.next();
         assertThat(iterator.peekNextKey(), equalTo(10L));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
new file mode 100644
index 0000000..114a150
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
+    private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) {
+        @Override
+        public long segmentId(Bytes key) {
+            return 0;
+        }
+    };
+    private static final int WINDOW_SIZE = 10;
+
+    private final String storeKey = "a";
+    private final String cacheKey = "b";
+
+    private final TimeWindow storeWindow = new TimeWindow(0, 1);
+    private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton(
+        KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator();
+    private final TimeWindow cacheWindow = new TimeWindow(10, 20);
+    private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
+        KeyValue.pair(
+            SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(
+                WindowStoreUtils.toBinaryKey(
+                    cacheKey, cacheWindow.start(), 0,
+                    new StateSerdes<>("dummy", Serdes.String(), Serdes.String())
+                )
+            ),
+            new LRUCacheEntry(cacheKey.getBytes())
+        )).iterator();
+
+    @Test
+    public void shouldHaveNextFromStore() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+            = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        assertTrue(mergeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetNextFromStore() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+            = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey)));
+    }
+
+    @Test
+    public void shouldPeekNextKeyFromStore() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+            = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow)));
+    }
+
+    @Test
+    public void shouldHaveNextFromCache() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+            = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
+                             cacheKvs);
+        assertTrue(mergeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetNextFromCache() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+            = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey)));
+    }
+
+    @Test
+    public void shouldPeekNextKeyFromCache() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+            = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+        assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow)));
+    }
+
+    @Test
+    public void shouldIterateBothStoreAndCache() throws Exception {
+        final MergedSortedCacheWindowStoreKeyValueIterator<String, String> iterator = createIterator(storeKvs, cacheKvs);
+        assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey)));
+        assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey)));
+        assertFalse(iterator.hasNext());
+    }
+
+    private MergedSortedCacheWindowStoreKeyValueIterator<String, String> createIterator(
+        final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
+        final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs
+    ) {
+        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
+            = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
+            = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
+        return new MergedSortedCacheWindowStoreKeyValueIterator<>(
+            cacheIterator,
+            storeIterator,
+            new StateSerdes<>("name", Serdes.String(), Serdes.String()),
+            WINDOW_SIZE,
+            SINGLE_SEGMENT_CACHE_FUNCTION
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index e37e0b4..6974240 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -18,8 +18,11 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -28,15 +31,23 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 /**
  * A very simple window store stub for testing purposes.
  */
 public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, StateStore {
 
-    private final Map<Long, Map<K, V>> data = new HashMap<>();
+    private final long windowSize;
+    private final Map<Long, NavigableMap<K, V>> data = new HashMap<>();
     private boolean open  = true;
 
+    public ReadOnlyWindowStoreStub(long windowSize) {
+        this.windowSize = windowSize;
+    }
+
     @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
         if (!open) {
@@ -52,9 +63,54 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         return new TheWindowStoreIterator<>(results.iterator());
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
+        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+        for (long now = timeFrom; now <= timeTo; now++) {
+            final NavigableMap<K, V> kvMap = data.get(now);
+            if (kvMap != null) {
+                for (Entry<K, V> entry : kvMap.subMap(from, true, to, true).entrySet()) {
+                    results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
+                }
+            }
+        }
+        final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
+
+        return new KeyValueIterator<Windowed<K>, V>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Windowed<K> peekNextKey() {
+                throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public KeyValue<Windowed<K>, V> next() {
+                return iterator.next();
+            }
+
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
+            }
+        };
+    }
+
     public void put(final K key, final V value, final long timestamp) {
         if (!data.containsKey(timestamp)) {
-            data.put(timestamp, new HashMap<K, V>());
+            data.put(timestamp, new TreeMap<K, V>());
         }
         data.get(timestamp).put(key, value);
     }
@@ -123,7 +179,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
 
         @Override
         public void remove() {
-
+            throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index f5998dc..c30b0e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -168,13 +168,30 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
         sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L);
 
-        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
-        final List<Long> results = new ArrayList<>();
+        KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
+        List<Long> results = new ArrayList<>();
         while (iterator.hasNext()) {
             results.add(iterator.next().value);
         }
 
         assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L)));
+
+
+        iterator = sessionStore.findSessions("aa", 0, Long.MAX_VALUE);
+        results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            results.add(iterator.next().value);
+        }
+
+        assertThat(results, equalTo(Arrays.asList(2L, 4L)));
+
+
+        final KeyValueIterator<Windowed<String>, Long> rangeIterator = sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE);
+        final List<Long> rangeResults = new ArrayList<>();
+        while (rangeIterator.hasNext()) {
+            rangeResults.add(rangeIterator.next().value);
+        }
+        assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L)));
     }
 
     static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {