You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/29 18:55:52 UTC

kafka git commit: KAFKA-4499: Add all() and fetchAll() API for querying window store

Repository: kafka
Updated Branches:
  refs/heads/trunk 58877a0de -> 0cc32abc1


KAFKA-4499: Add all() and fetchAll() API for querying window store

A rebased version of the code.

Author: RichardYuSTUG <yo...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #4258 from ConcurrencyPractitioner/trunk

Add EmptyWindowStoreIterator to NoOpWindowStore


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cc32abc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cc32abc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cc32abc

Branch: refs/heads/trunk
Commit: 0cc32abc17d26c5faf6fdc3b63a01016e05a65b9
Parents: 58877a0
Author: Richard Yu <yo...@gmail.com>
Authored: Wed Nov 29 10:45:14 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 29 10:55:43 2017 -0800

----------------------------------------------------------------------
 .../streams/state/ReadOnlyWindowStore.java      | 19 ++++
 .../state/internals/CachingWindowStore.java     | 35 ++++++++
 .../ChangeLoggingWindowBytesStore.java          | 11 ++-
 .../internals/CompositeReadOnlyWindowStore.java | 28 ++++++
 .../state/internals/MeteredWindowStore.java     | 14 +++
 .../streams/state/internals/NamedCache.java     |  5 ++
 .../internals/RocksDBSegmentedBytesStore.java   | 21 ++++-
 .../streams/state/internals/RocksDBStore.java   | 22 +++++
 .../state/internals/RocksDBWindowStore.java     | 12 +++
 .../kafka/streams/state/internals/Segment.java  |  7 +-
 .../state/internals/SegmentIterator.java        | 14 +--
 .../state/internals/SegmentedBytesStore.java    | 20 +++++
 .../state/internals/SegmentedCacheFunction.java |  2 +-
 .../kafka/streams/state/internals/Segments.java | 16 ++++
 .../state/internals/SessionKeySchema.java       |  5 +-
 .../streams/state/internals/ThreadCache.java    |  3 +-
 .../state/internals/WindowKeySchema.java        |  5 +-
 .../kafka/streams/state/NoOpWindowStore.java    | 44 +++++++++-
 .../state/internals/CachingWindowStoreTest.java | 50 ++++++++++-
 .../CompositeReadOnlyWindowStoreTest.java       | 26 ++++++
 .../internals/ReadOnlyWindowStoreStub.java      | 92 ++++++++++++++++++++
 .../RocksDBSegmentedBytesStoreTest.java         | 50 +++++++++++
 .../state/internals/RocksDBWindowStoreTest.java | 50 +++++++++++
 .../state/internals/SessionKeySchemaTest.java   |  7 ++
 .../state/internals/WindowKeySchemaTest.java    | 38 ++++++++
 .../kafka/test/SegmentedBytesStoreStub.java     | 12 +++
 26 files changed, 589 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index cff1f6c..f92ab6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -77,4 +77,23 @@ public interface ReadOnlyWindowStore<K, V> {
      * @throws NullPointerException If null is used for any key.
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
+    
+    /**
+    * Gets all the key-value pairs in the existing windows.
+    *
+    * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+    * @throws InvalidStateStoreException if the store is not initialized
+    */
+    KeyValueIterator<Windowed<K>, V> all();
+    
+    /**
+     * Gets all the key-value pairs that belong to the windows within in the given time range.
+     *
+     * @param timeFrom the beginning of the time slot from which to search
+     * @param timeTo   the end of the time slot from which to search
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+     * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException if null is used for any key
+     */
+    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 31ca68b..75acd77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -216,5 +216,40 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
             }
         }
     }
+    
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+        validateStoreOpen();
 
+        final KeyValueIterator<Windowed<Bytes>, byte[]>  underlyingIterator = underlying.all();
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
+
+        return new MergedSortedCacheWindowStoreKeyValueIterator(
+            cacheIterator,
+            underlyingIterator,
+            bytesSerdes,
+            windowSize,
+            cacheFunction
+        );
+    }
+    
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+        validateStoreOpen();
+        
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetchAll(timeFrom, timeTo);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
+        
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(null, null, timeFrom, timeTo);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator,
+                                                                                                              hasNextCondition,
+                                                                                                              cacheFunction);
+        return new MergedSortedCacheWindowStoreKeyValueIterator(
+                filteredCacheIterator,
+                underlyingIterator,
+                bytesSerdes,
+                windowSize,
+                cacheFunction
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 0035019..4fe4b99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -56,7 +56,16 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
         return bytesStore.fetch(keyFrom, keyTo, from, to);
     }
 
-
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+        return bytesStore.all();
+    }
+    
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+        return bytesStore.fetchAll(timeFrom, timeTo);
+    }
+    
     @Override
     public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 5acb6b4..6afc6fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -80,4 +80,32 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
                                                                provider.stores(storeName, windowStoreType).iterator(),
                                                                nextIteratorFunction));
     }
+    
+    @Override
+    public KeyValueIterator<Windowed<K>, V> all() {
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
+            @Override
+            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
+                return store.all();
+            }
+        };
+        return new DelegatingPeekingKeyValueIterator<>(storeName,
+                new CompositeKeyValueIterator<>(
+                        provider.stores(storeName, windowStoreType).iterator(),
+                        nextIteratorFunction));
+    }
+    
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
+            @Override
+            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
+                return store.fetchAll(timeFrom, timeTo);
+            }
+        };
+        return new DelegatingPeekingKeyValueIterator<>(storeName,
+                new CompositeKeyValueIterator<>(
+                        provider.stores(storeName, windowStoreType).iterator(),
+                        nextIteratorFunction));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 20c7c43..e890005 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -113,6 +113,20 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
+    public KeyValueIterator<Windowed<K>, V> all() {
+        return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time);
+    }
+    
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+        return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom, timeTo), 
+                                                     fetchTime, 
+                                                     metrics, 
+                                                     serdes, 
+                                                     time);
+    }
+    
+    @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
                                                      fetchTime,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 47d5152..f838c55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -278,6 +279,10 @@ class NamedCache {
     synchronized Iterator<Bytes> allKeys() {
         return keySetIterator(cache.navigableKeySet());
     }
+    
+    synchronized NavigableSet<Bytes> keySet() {
+        return cache.navigableKeySet();
+    }
 
     synchronized LRUCacheEntry first() {
         if (head == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 4d4ee41..865703c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -65,7 +65,26 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
                                    keySchema.hasNextCondition(keyFrom, keyTo, from, to),
                                    binaryFrom, binaryTo);
     }
-
+    
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        
+        final List<Segment> searchSpace = segments.allSegments();
+        
+        return new SegmentIterator(searchSpace.iterator(),
+                                   keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
+                                   null, null);
+    }
+    
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+        final List<Segment> searchSpace = segments.segments(timeFrom, timeTo);
+        
+        return new SegmentIterator(searchSpace.iterator(),
+                                   keySchema.hasNextCondition(null, null, timeFrom, timeTo),
+                                   null, null);
+    }
+    
     @Override
     public void remove(final Bytes key) {
         final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ea01694..4c7e61b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -387,6 +387,28 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         return rocksDbIterator;
     }
 
+    public synchronized KeyValue<K, V> first() {
+        validateStoreOpen();
+        
+        RocksIterator innerIter = db.newIterator();
+        innerIter.seekToFirst();
+        KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value()));
+        innerIter.close();
+
+        return pair;
+    }
+
+    public synchronized KeyValue<K, V> last() {
+        validateStoreOpen();
+        
+        RocksIterator innerIter = db.newIterator();
+        innerIter.seekToLast();
+        KeyValue<K, V> pair = new KeyValue<>(serdes.keyFrom(innerIter.key()), serdes.valueFrom(innerIter.value()));
+        innerIter.close();
+
+        return pair;
+    }
+
     /**
      * Return an approximate count of key-value mappings in this store.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b7dd532..f1a9c63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -118,6 +118,18 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
+    
+    @Override
+    public KeyValueIterator<Windowed<K>, V> all() {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
+        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
+    }
+    
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo);
+        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
+    }
 
     void maybeUpdateSeqnumForDups() {
         if (retainDuplicates) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index b9cc8b7..50c1547 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import java.io.IOException;
 
 // Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
-class Segment extends RocksDBStore<Bytes, byte[]> {
+class Segment extends RocksDBStore<Bytes, byte[]> implements Comparable<Segment> {
     public final long id;
 
     Segment(String segmentName, String windowName, long id) {
@@ -36,6 +36,11 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
     }
 
     @Override
+    public int compareTo(Segment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
     public void openDB(final ProcessorContext context) {
         super.openDB(context);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 9d0b6cd..099cba1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -32,11 +32,11 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
 
     private final Bytes from;
     private final Bytes to;
-    private final Iterator<Segment> segments;
-    private final HasNextCondition hasNextCondition;
+    protected final Iterator<Segment> segments;
+    protected final HasNextCondition hasNextCondition;
 
-    private KeyValueStore<Bytes, byte[]> currentSegment;
-    private KeyValueIterator<Bytes, byte[]> currentIterator;
+    protected KeyValueStore<Bytes, byte[]> currentSegment;
+    protected KeyValueIterator<Bytes, byte[]> currentIterator;
 
     SegmentIterator(final Iterator<Segment> segments,
                     final HasNextCondition hasNextCondition,
@@ -71,7 +71,11 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
             close();
             currentSegment = segments.next();
             try {
-                currentIterator = currentSegment.range(from, to);
+                if (from == null || to == null) {
+                    currentIterator = currentSegment.all();
+                } else {
+                    currentIterator = currentSegment.range(from, to);
+                }
             } catch (InvalidStateStoreException e) {
                 // segment may have been closed so we ignore it.
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 72ae6e2..19cf319 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
@@ -49,6 +50,25 @@ public interface SegmentedBytesStore extends StateStore {
      * @return  an iterator over key-value pairs
      */
     KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to);
+    
+    /**
+     * Gets all the key-value pairs in the existing windows.
+     *
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+     * @throws InvalidStateStoreException if the store is not initialized
+     */
+    KeyValueIterator<Bytes, byte[]> all();
+    
+    /**
+     * Gets all the key-value pairs that belong to the windows within in the given time range.
+     *
+     * @param timeFrom the beginning of the time slot from which to search
+     * @param timeTo   the end of the time slot from which to search
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+     * @throws InvalidStateStoreException if the store is not initialized
+     * @throws NullPointerException if null is used for any key
+     */
+    KeyValueIterator<Bytes, byte[]> fetchAll(final long from, final long to);
 
     /**
      * Remove the record with the provided key. The key

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
index 8571f92..3ab7930 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
@@ -52,7 +52,7 @@ class SegmentedCacheFunction implements CacheFunction {
         System.arraycopy(cacheKey.get(), SEGMENT_ID_BYTES, binaryKey, 0, binaryKey.length);
         return binaryKey;
     }
-
+    
     public long segmentId(Bytes key) {
         return keySchema.segmentTimestamp(key) / segmentInterval;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 5993972..2cf0913 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -28,6 +28,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.SimpleTimeZone;
@@ -143,6 +144,21 @@ class Segments {
         return segments;
     }
 
+    List<Segment> allSegments() {
+        final List<Segment> segments = new ArrayList<>();
+        for (Segment segment : this.segments.values()) {
+            if (segment.isOpen()) {
+                try {
+                    segments.add(segment);
+                } catch (InvalidStateStoreException ise) {
+                    // segment may have been closed by streams thread;
+                }
+            }
+        }
+        Collections.sort(segments);
+        return segments;
+    }
+    
     void flush() {
         for (Segment segment : segments.values()) {
             segment.flush();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 6d6d9bf..e3dd553 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -79,8 +79,8 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
                 while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
                     final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
-                    if (windowedKey.key().compareTo(binaryKeyFrom) >= 0
-                        && windowedKey.key().compareTo(binaryKeyTo) <= 0
+                    if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
+                        && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
                         && windowedKey.window().end() >= from
                         && windowedKey.window().start() <= to) {
                         return true;
@@ -96,5 +96,4 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
     public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
         return segments.segments(from, Long.MAX_VALUE);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 01a4bef..b1fd198 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -195,8 +195,7 @@ public class ThreadCache {
         }
         return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
     }
-
-
+    
     public long size() {
         long size = 0;
         for (NamedCache cache : caches.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 214f36b..739792f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -75,8 +75,8 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
                     final Bytes bytes = iterator.peekNextKey();
                     final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
                     final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
-                    if (keyBytes.compareTo(binaryKeyFrom) >= 0
-                        && keyBytes.compareTo(binaryKeyTo) <= 0
+                    if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
+                        && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
                         && time >= from
                         && time <= to) {
                         return true;
@@ -92,4 +92,5 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
     public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
         return segments.segments(from, to);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 3ad6475..1ded31f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -20,8 +20,38 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 
+import java.util.NoSuchElementException;
+
 public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
 
+    private static class EmptyWindowStoreIterator implements WindowStoreIterator<KeyValue> {
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public Long peekNextKey() {
+            throw new NoSuchElementException();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return false;
+        }
+
+        @Override
+        public KeyValue<Long, KeyValue> next() {
+            throw new NoSuchElementException();
+        }
+
+        @Override
+        public void remove() {
+        }
+    }
+
+    private static final WindowStoreIterator<KeyValue> EMPTY_WINDOW_STORE_ITERATOR = new EmptyWindowStoreIterator();
+
     @Override
     public String name() {
         return "";
@@ -54,11 +84,21 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
 
     @Override
     public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
-        return null;
+        return EMPTY_WINDOW_STORE_ITERATOR;
     }
 
     @Override
     public WindowStoreIterator<KeyValue> fetch(Object from, Object to, long timeFrom, long timeTo) {
-        return null;
+        return EMPTY_WINDOW_STORE_ITERATOR;
+    }
+    
+    @Override
+    public WindowStoreIterator<KeyValue> all() {
+        return EMPTY_WINDOW_STORE_ITERATOR;
+    }
+    
+    @Override
+    public WindowStoreIterator<KeyValue> fetchAll(long timeFrom, long timeTo) {
+        return EMPTY_WINDOW_STORE_ITERATOR;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index f1a0038..239a007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -129,7 +129,55 @@ public class CachingWindowStoreTest {
         assertFalse(iterator.hasNext());
         assertEquals(2, cache.size());
     }
-
+    
+    @Test
+    public void shouldGetAllFromCache() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"));
+        cachingStore.put(bytesKey("b"), bytesValue("b"));
+        cachingStore.put(bytesKey("c"), bytesValue("c"));
+        cachingStore.put(bytesKey("d"), bytesValue("d"));
+        cachingStore.put(bytesKey("e"), bytesValue("e"));
+        cachingStore.put(bytesKey("f"), bytesValue("f"));
+        cachingStore.put(bytesKey("g"), bytesValue("g"));
+        cachingStore.put(bytesKey("h"), bytesValue("h"));
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
+        String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+        for (String s : array) {
+            verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), s);
+        }
+        assertFalse(iterator.hasNext());
+    }
+    
+    @Test
+    public void shouldFetchAllWithinTimestampRange() {
+        String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+        for (int i = 0; i < array.length; i++) {
+            context.setTime(i);
+            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
+        }
+        
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.fetchAll(0, 7);
+        for (int i = 0; i < array.length; i++) {
+            String str = array[i];
+            verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+        }
+        assertFalse(iterator.hasNext());
+        
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 = cachingStore.fetchAll(2, 4);
+        for (int i = 2; i <= 4; i++) {
+            String str = array[i];
+            verifyWindowedKeyValue(iterator1.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+        }
+        assertFalse(iterator1.hasNext());
+        
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 = cachingStore.fetchAll(5, 7);
+        for (int i = 5; i <= 7; i++) {
+            String str = array[i];
+            verifyWindowedKeyValue(iterator2.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+        }
+        assertFalse(iterator2.hasNext());
+    }
 
     @Test
     public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index f887858..58fddaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -179,6 +179,32 @@ public class CompositeReadOnlyWindowStoreTest {
                 KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
                 KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
     }
+    
+    @Test
+    public void shouldGetAllAcrossStores() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
+                ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.all());
+        assertThat(results, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+                KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+    }
+    
+    @Test
+    public void shouldFetchAllAcrossStores() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
+                ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.fetchAll(0, 10));
+        assertThat(results, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+                KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+    }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNPEIfKeyIsNull() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 6974240..256df33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -47,6 +47,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
     public ReadOnlyWindowStoreStub(long windowSize) {
         this.windowSize = windowSize;
     }
+    
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
@@ -64,6 +65,97 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
     }
 
     @Override
+    public KeyValueIterator<Windowed<K>, V> all() {
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
+        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+        for (long now : data.keySet()) {
+            final NavigableMap<K, V> kvMap = data.get(now);
+            if (kvMap != null) {
+                for (Entry<K, V> entry : kvMap.entrySet()) {
+                    results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
+                }
+            }
+        }
+        final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
+
+        return new KeyValueIterator<Windowed<K>, V>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Windowed<K> peekNextKey() {
+                throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public KeyValue<Windowed<K>, V> next() {
+                return iterator.next();
+            }
+
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
+            }
+        };
+    }
+    
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
+        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+        for (long now : data.keySet()) {
+            if (!(now >= timeFrom && now <= timeTo)) continue;
+            final NavigableMap<K, V> kvMap = data.get(now);
+            if (kvMap != null) {
+                for (Entry<K, V> entry : kvMap.entrySet()) {
+                    results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
+                }
+            }
+        }
+        final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
+
+        return new KeyValueIterator<Windowed<K>, V>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Windowed<K> peekNextKey() {
+                throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public KeyValue<Windowed<K>, V> next() {
+                return iterator.next();
+            }
+
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
+            }
+        };
+    }
+
+    @Override
     public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
         if (!open) {
             throw new InvalidStateStoreException("Store is not open");

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index bf3386d..d8d291c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -156,6 +156,55 @@ public class RocksDBSegmentedBytesStoreTest {
     }
 
     @Test
+    public void shouldGetAllSegments() {
+        // just to validate directories
+        final Segments segments = new Segments(storeName, retention, numSegments);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+        assertEquals(Utils.mkSet(segments.segmentName(0),
+                                 segments.segmentName(1)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+        assertEquals(Utils.mkSet(segments.segmentName(0),
+                                 segments.segmentName(1),
+                                 segments.segmentName(2)), segmentDirs());
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
+                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L),
+                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L)
+                                                 ), results);
+
+    }
+
+    @Test
+    public void shouldFetchAllSegments() {
+        // just to validate directories
+        final Segments segments = new Segments(storeName, retention, numSegments);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+        assertEquals(Utils.mkSet(segments.segmentName(0),
+                                 segments.segmentName(1)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+        assertEquals(Utils.mkSet(segments.segmentName(0),
+                                 segments.segmentName(1),
+                                 segments.segmentName(2)), segmentDirs());
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60000L));
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
+                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)
+                                                 ), results);
+
+    }
+
+    @Test
     public void shouldLoadSegementsWithOldStyleDateFormattedName() {
         final Segments segments = new Segments(storeName, retention, numSegments);
         final String key = "a";
@@ -209,6 +258,7 @@ public class RocksDBSegmentedBytesStoreTest {
             KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
     }
 
+
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 7736d9d..3b7e2c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -190,6 +190,56 @@ public class RocksDBWindowStoreTest {
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
         assertNull(entriesByKey.get(6));
     }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetAll() throws IOException {
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
+        
+        putFirstBatch(windowStore, startTime, context);
+        
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        
+        assertEquals(
+                Utils.mkList(zero, one, two, four, five),
+                StreamsTestUtils.toList(windowStore.all())
+        );
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAllInTimeRange() throws IOException {
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
+        
+        putFirstBatch(windowStore, startTime, context);
+        
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        
+        assertEquals(
+                Utils.mkList(one, two, four),
+                StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 4))
+        );
+        
+        assertEquals(
+                Utils.mkList(zero, one, two),
+                StreamsTestUtils.toList(windowStore.fetchAll(startTime + 0, startTime + 3))
+        );
+        
+        assertEquals(
+                Utils.mkList(one, two, four, five),
+                StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 5))
+        );
+    }
 
     @SuppressWarnings("unchecked")
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index c3f52c9..3b731f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -66,6 +66,13 @@ public class SessionKeySchemaTest {
     }
 
     @Test
+    public void shouldFetchAllKeysUsingNullKeys() {
+        final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
+        final List<Integer> results = getValues(hasNextCondition);
+        assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
+    }
+    
+    @Test
     public void testUpperBoundWithLargeTimestamps() {
         Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 9720a9f..d75cca0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -18,14 +18,44 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 public class WindowKeySchemaTest {
 
     private final WindowKeySchema windowKeySchema = new WindowKeySchema();
+    private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
+
+    @Before
+    public void before() {
+        windowKeySchema.init("topic");
+        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
+        iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
+    }
+    
+    @Test
+    public void testHasNextConditionUsingNullKeys() {
+        final HasNextCondition hasNextCondition = windowKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
+        final List<Integer> results = getValues(hasNextCondition);
+        assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
+    }
 
     @Test
     public void testUpperBoundWithLargeTimestamps() {
@@ -128,4 +158,12 @@ public class WindowKeySchemaTest {
 
         assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
     }
+    
+    private List<Integer> getValues(final HasNextCondition hasNextCondition) {
+        final List<Integer> results = new ArrayList<>();
+        while (hasNextCondition.hasNext(iterator)) {
+            results.add(iterator.next().value);
+        }
+        return results;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cc32abc/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
index 7f8457c..4709ef3 100644
--- a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
@@ -58,6 +58,18 @@ public class SegmentedBytesStoreStub implements SegmentedBytesStore {
         fetchCalled = true;
         return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
     }
+    
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        fetchCalled = true;
+        return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
+    }
+    
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetchAll(long timeFrom, long timeTo) {
+        fetchCalled = true;
+        return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
+    }
 
     @Override
     public void remove(final Bytes key) {