You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/22 16:16:53 UTC

[kafka] branch trunk updated: KAFKA-13211: add support for infinite range query for WindowStore (#11227)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b61ec00  KAFKA-13211: add support for infinite range query for WindowStore (#11227)
b61ec00 is described below

commit b61ec0003f907b61243102811fdb2e92f8d7d2c5
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Sep 23 00:14:58 2021 +0800

    KAFKA-13211: add support for infinite range query for WindowStore (#11227)
    
    Add support for infinite range query for WindowStore. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210
    
    Reviewers: Patrick Stuedi <ps...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/state/ReadOnlyWindowStore.java   |   6 +-
 .../apache/kafka/streams/state/WindowStore.java    |   3 +-
 .../AbstractRocksDBSegmentedBytesStore.java        |   6 +-
 .../state/internals/CachingWindowStore.java        |  20 +--
 .../internals/CompositeReadOnlyWindowStore.java    |   4 -
 .../state/internals/InMemoryWindowStore.java       |  40 +++--
 .../state/internals/MeteredWindowStore.java        |  16 +-
 .../streams/state/internals/SegmentIterator.java   |  14 +-
 .../AbstractRocksDBSegmentedBytesStoreTest.java    | 136 ++++++++++++++++-
 .../internals/AbstractWindowBytesStoreTest.java    |  58 +++++--
 .../CachingPersistentWindowStoreTest.java          | 167 ++++++++++++++++++---
 .../CompositeReadOnlyWindowStoreTest.java          | 103 +++++++++++--
 .../state/internals/MeteredWindowStoreTest.java    |  39 +++--
 .../state/internals/ReadOnlyWindowStoreStub.java   |  32 +++-
 .../state/internals/SegmentIteratorTest.java       | 145 ++++++++++++++++++
 .../state/internals/WindowStoreFetchTest.java      |  81 ++++++++++
 .../org/apache/kafka/test/StreamsTestUtils.java    |  20 +++
 17 files changed, 773 insertions(+), 117 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index aa84bfc..3df170d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -128,12 +128,13 @@ public interface ReadOnlyWindowStore<K, V> {
      * This iterator must be closed after use.
      *
      * @param keyFrom     the first key in the range
+     *                    A null value indicates a starting position from the first element in the store.
      * @param keyTo       the last key in the range
+     *                    A null value indicates that the range ends with the last element in the store.
      * @param timeFrom time range start (inclusive), where iteration starts.
      * @param timeTo   time range end (inclusive), where iteration ends.
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from beginning to end of time.
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException       if {@code null} is used for any key.
      * @throws IllegalArgumentException   if duration is negative or can't be represented as {@code long milliseconds}
      */
     KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
@@ -146,12 +147,13 @@ public interface ReadOnlyWindowStore<K, V> {
      * This iterator must be closed after use.
      *
      * @param keyFrom     the first key in the range
+     *                    A null value indicates a starting position from the first element in the store.
      * @param keyTo       the last key in the range
+     *                    A null value indicates that the range ends with the last element in the store.
      * @param timeFrom time range start (inclusive), where iteration ends.
      * @param timeTo   time range end (inclusive), where iteration starts.
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}, from end to beginning of time.
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException       if {@code null} is used for any key.
      * @throws IllegalArgumentException   if duration is negative or can't be represented as {@code long milliseconds}
      */
     default KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 31c6eb1..86c82fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -117,12 +117,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * This iterator must be closed after use.
      *
      * @param keyFrom     the first key in the range
+     *                    A null value indicates a starting position from the first element in the store.
      * @param keyTo       the last key in the range
+     *                    A null value indicates that the range ends with the last element in the store.
      * @param timeFrom time range start (inclusive)
      * @param timeTo   time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException       if one of the given keys is {@code null}
      */
     // WindowStore keeps a long-based implementation of ReadOnlyWindowStore#fetch Instant-based
     // if super#fetch is removed, keep this implementation as it serves PAPI Stores.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index f7aef11..bfee6b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -114,7 +114,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
                                           final long from,
                                           final long to,
                                           final boolean forward) {
-        if (keyFrom.compareTo(keyTo) > 0) {
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
             LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
                 "This may be due to range arguments set in the wrong order, " +
                 "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
@@ -124,8 +124,8 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
 
         final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
 
-        final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
-        final Bytes binaryTo = keySchema.upperRange(keyTo, to);
+        final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
+        final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);
 
         return new SegmentIterator<>(
             searchSpace.iterator(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 59604a5..fa04ac8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -244,7 +244,7 @@ class CachingWindowStore
                                                            final Bytes keyTo,
                                                            final long timeFrom,
                                                            final long timeTo) {
-        if (keyFrom.compareTo(keyTo) > 0) {
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
             LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
                 "This may be due to range arguments set in the wrong order, " +
                 "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
@@ -266,8 +266,8 @@ class CachingWindowStore
             new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, true) :
             context.cache().range(
                 cacheName,
-                cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
-                cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
+                keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
+                keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
             );
 
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
@@ -289,7 +289,7 @@ class CachingWindowStore
                                                                    final Bytes keyTo,
                                                                    final long timeFrom,
                                                                    final long timeTo) {
-        if (keyFrom.compareTo(keyTo) > 0) {
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
             LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
                 + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
                 "Note that the built-in numerical serdes do not follow this for negative numbers");
@@ -310,8 +310,8 @@ class CachingWindowStore
             new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) :
             context.cache().reverseRange(
                 cacheName,
-                cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
-                cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
+                keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
+                keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
             );
 
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
@@ -573,12 +573,14 @@ class CachingWindowStore
                 throw new IllegalStateException("Error iterating over segments: segment interval has changed");
             }
 
-            if (keyFrom.equals(keyTo)) {
+            if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) {
                 cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
                 cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
             } else {
-                cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
-                cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
+                cacheKeyFrom = keyFrom == null ? null :
+                    cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
+                cacheKeyTo = keyTo == null ? null :
+                    cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 4227855..c6b0b60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -115,8 +115,6 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
                                                   final K keyTo,
                                                   final Instant timeFrom,
                                                   final Instant timeTo) {
-        Objects.requireNonNull(keyFrom, "keyFrom can't be null");
-        Objects.requireNonNull(keyTo, "keyTo can't be null");
         final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
             store -> store.fetch(keyFrom, keyTo, timeFrom, timeTo);
         return new DelegatingPeekingKeyValueIterator<>(
@@ -131,8 +129,6 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
                                                           final K keyTo,
                                                           final Instant timeFrom,
                                                           final Instant timeTo) throws IllegalArgumentException {
-        Objects.requireNonNull(keyFrom, "keyFrom can't be null");
-        Objects.requireNonNull(keyTo, "keyTo can't be null");
         final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction =
             store -> store.backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
         return new DelegatingPeekingKeyValueIterator<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index ae37542..5327e75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -213,12 +213,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
                                                     final long timeFrom,
                                                     final long timeTo,
                                                     final boolean forward) {
-        Objects.requireNonNull(from, "from key cannot be null");
-        Objects.requireNonNull(to, "to key cannot be null");
-
         removeExpiredSegments();
 
-        if (from.compareTo(to) > 0) {
+        if (from != null && to != null && from.compareTo(to) > 0) {
             LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
                 "This may be due to range arguments set in the wrong order, " +
                 "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
@@ -397,7 +394,8 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         final Bytes to = (retainDuplicates && keyTo != null) ? wrapForDups(keyTo, Integer.MAX_VALUE) : keyTo;
 
         final WrappedWindowedKeyValueIterator iterator =
-            new WrappedWindowedKeyValueIterator(from,
+            new WrappedWindowedKeyValueIterator(
+                from,
                 to,
                 segmentIterator,
                 openIterators::remove,
@@ -462,7 +460,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             }
 
             final Bytes key = getKey(next.key);
-            if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) {
+            if (isKeyWithinRange(key)) {
                 return true;
             } else {
                 next = null;
@@ -470,6 +468,23 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             }
         }
 
+        private boolean isKeyWithinRange(final Bytes key) {
+            // split all cases for readability and avoid BooleanExpressionComplexity checkstyle warning
+            if (keyFrom == null && keyTo == null) {
+                // fetch all
+                return true;
+            } else if (keyFrom == null) {
+                // start from the beginning
+                return key.compareTo(getKey(keyTo)) <= 0;
+            } else if (keyTo == null) {
+                // end to the last
+                return key.compareTo(getKey(keyFrom)) >= 0;
+            } else {
+                // key is within the range
+                return key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0;
+            }
+        }
+
         public void close() {
             next = null;
             recordIterator = null;
@@ -499,9 +514,16 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
             currentTime = currentSegment.getKey();
 
-            final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
-                currentSegment.getValue() :
-                currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+            final ConcurrentNavigableMap<Bytes, byte[]> subMap;
+            if (allKeys) { // keyFrom == null && keyTo == null
+                subMap = currentSegment.getValue();
+            } else if (keyFrom == null) {
+                subMap = currentSegment.getValue().headMap(keyTo, true);
+            } else if (keyTo == null) {
+                subMap = currentSegment.getValue().tailMap(keyFrom, true);
+            } else {
+                subMap = currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+            }
 
             if (forward) {
                 return subMap.entrySet().iterator();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 24c17f2..48b42fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -245,10 +245,12 @@ public class MeteredWindowStore<K, V>
                                                   final K keyTo,
                                                   final long timeFrom,
                                                   final long timeTo) {
-        Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
-        Objects.requireNonNull(keyTo, "keyTo cannot be null");
         return new MeteredWindowedKeyValueIterator<>(
-            wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo),
+            wrapped().fetch(
+                keyFrom == null ? null : keyBytes(keyFrom),
+                keyTo == null ? null : keyBytes(keyTo),
+                timeFrom,
+                timeTo),
             fetchSensor,
             streamsMetrics,
             serdes,
@@ -260,10 +262,12 @@ public class MeteredWindowStore<K, V>
                                                           final K keyTo,
                                                           final long timeFrom,
                                                           final long timeTo) {
-        Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
-        Objects.requireNonNull(keyTo, "keyTo cannot be null");
         return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo),
+            wrapped().backwardFetch(
+                keyFrom == null ? null : keyBytes(keyFrom),
+                keyTo == null ? null : keyBytes(keyTo),
+                timeFrom,
+                timeTo),
             fetchSensor,
             streamsMetrics,
             serdes,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 03b66a6..6191c49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -74,18 +74,10 @@ class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, byte
             close();
             currentSegment = segments.next();
             try {
-                if (from == null || to == null) {
-                    if (forward) {
-                        currentIterator = currentSegment.all();
-                    } else {
-                        currentIterator = currentSegment.reverseAll();
-                    }
+                if (forward) {
+                    currentIterator = currentSegment.range(from, to);
                 } else {
-                    if (forward) {
-                        currentIterator = currentSegment.range(from, to);
-                    } else {
-                        currentIterator = currentSegment.reverseRange(from, to);
-                    }
+                    currentIterator = currentSegment.reverseRange(from, to);
                 }
             } catch (final InvalidStateStoreException e) {
                 // segment may have been closed so we ignore it.
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 514bb79..0640a46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -145,16 +145,138 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
     @Test
     public void shouldPutAndFetch() {
-        final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
-        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
-        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+        final String keyA = "a";
+        final String keyB = "b";
+        final String keyC = "c";
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100));
+        bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200));
 
-        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500)) {
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+            Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
 
             final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+            Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+            null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+            Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
+            null, null, 0, windows[3].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+    }
+
+    @Test
+    public void shouldPutAndBackwardFetch() {
+        final String keyA = "a";
+        final String keyB = "b";
+        final String keyC = "c";
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100));
+        bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200));
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+            Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+            Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+            null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+            Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
+            null, null, 0, windows[3].start())) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
+                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
+                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
             );
 
             assertEquals(expected, toList(values));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 153db97..e93f758 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -436,6 +436,30 @@ public abstract class AbstractWindowBytesStoreTest {
                 ofEpochMilli(defaultStartTime + 3L),
                 ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
         );
+        assertEquals(
+            asList(zero, one, two),
+            toList(windowStore.fetch(
+                null,
+                2,
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L)))
+        );
+        assertEquals(
+            asList(two, three, four, five),
+            toList(windowStore.fetch(
+                2,
+                null,
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+        );
+        assertEquals(
+            asList(zero, one, two, three, four, five),
+            toList(windowStore.fetch(
+                null,
+                null,
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+        );
     }
 
     @Test
@@ -506,6 +530,30 @@ public abstract class AbstractWindowBytesStoreTest {
                 ofEpochMilli(defaultStartTime + 3L),
                 ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
         );
+        assertEquals(
+            asList(two, one, zero),
+            toList(windowStore.backwardFetch(
+                null,
+                2,
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L)))
+        );
+        assertEquals(
+            asList(five, four, three, two),
+            toList(windowStore.backwardFetch(
+                2,
+                null,
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+        );
+        assertEquals(
+            asList(five, four, three, two, one, zero),
+            toList(windowStore.backwardFetch(
+                null,
+                null,
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L)))
+        );
     }
 
     @Test
@@ -861,16 +909,6 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-        assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L)));
-    }
-
-    @Test
-    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-        assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L)));
-    }
-
-    @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() {
         final WindowStore<Bytes, String> windowStore = buildWindowStore(RETENTION_PERIOD,
             WINDOW_SIZE,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 023d69a..2d64a44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -54,6 +54,7 @@ import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -64,6 +65,7 @@ import static java.time.Instant.ofEpochMilli;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -283,21 +285,158 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
 
         try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
-                 cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) {
-            verifyWindowedKeyValue(
-                iterator.next(),
+                 cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
                 new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-                "a");
-            verifyWindowedKeyValue(
-                iterator.next(),
-                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-                "b");
-            assertFalse(iterator.hasNext());
+                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("a", "b");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
             assertEquals(2, cache.size());
         }
     }
 
     @Test
+    public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("a", "b", "c", "d");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCacheForNullKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("b", "c", "d", "e");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("a", "b", "c", "d", "e");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(null, bytesKey("c"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("c", "b", "a");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(bytesKey("c"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("e", "d", "c");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("e", "d", "c", "b", "a");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
     public void shouldGetAllFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
@@ -814,16 +953,6 @@ public class CachingPersistentWindowStoreTest {
     }
 
     @Test
-    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-        assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, bytesKey("anyTo"), ofEpochMilli(1L), ofEpochMilli(2L)));
-    }
-
-    @Test
-    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-        assertThrows(NullPointerException.class, () -> cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L)));
-    }
-
-    @Test
     public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
         final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
         final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 73439b1..db15276 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -86,6 +86,7 @@ public class CompositeReadOnlyWindowStoreTest {
         );
     }
 
+
     @Test
     public void shouldBackwardFetchValuesFromWindowStore() {
         underlyingWindowStore.put("my-key", "my-value", 0L);
@@ -354,6 +355,97 @@ public class CompositeReadOnlyWindowStoreTest {
     }
 
     @Test
+    public void shouldFetchKeyRangeAcrossStoresWithNullKeyTo() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        secondUnderlying.put("c", "c", 10L);
+        final List<KeyValue<Windowed<String>, String>> results =
+            StreamsTestUtils.toList(windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(10)));
+        assertThat(results, equalTo(Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+            KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"),
+            KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"))));
+    }
+
+    @Test
+    public void shouldFetchKeyRangeAcrossStoresWithNullKeyFrom() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        secondUnderlying.put("c", "c", 10L);
+        final List<KeyValue<Windowed<String>, String>> results =
+            StreamsTestUtils.toList(windowStore.fetch(null, "c", ofEpochMilli(0), ofEpochMilli(10)));
+        assertThat(results, equalTo(Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+            KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"),
+            KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"))));
+    }
+
+    @Test
+    public void shouldFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        secondUnderlying.put("c", "c", 10L);
+        final List<KeyValue<Windowed<String>, String>> results =
+            StreamsTestUtils.toList(windowStore.fetch(null, null, ofEpochMilli(0), ofEpochMilli(10)));
+        assertThat(results, equalTo(Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+            KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"),
+            KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"))));
+    }
+
+    @Test
+    public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyTo() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        secondUnderlying.put("c", "c", 10L);
+        final List<KeyValue<Windowed<String>, String>> results =
+            StreamsTestUtils.toList(windowStore.backwardFetch("a", null, ofEpochMilli(0), ofEpochMilli(10)));
+        assertThat(results, equalTo(Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+            KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"),
+            KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+    }
+
+    @Test
+    public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFrom() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        secondUnderlying.put("c", "c", 10L);
+        final List<KeyValue<Windowed<String>, String>> results =
+            StreamsTestUtils.toList(windowStore.backwardFetch(null, "c", ofEpochMilli(0), ofEpochMilli(10)));
+        assertThat(results, equalTo(Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+            KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"),
+            KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")
+            )));
+    }
+
+    @Test
+    public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 10L);
+        secondUnderlying.put("c", "c", 10L);
+        final List<KeyValue<Windowed<String>, String>> results =
+            StreamsTestUtils.toList(windowStore.backwardFetch(null, null, ofEpochMilli(0), ofEpochMilli(10)));
+        assertThat(results, equalTo(Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+            KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"),
+            KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
+    }
+
+    @Test
     public void shouldBackwardFetchKeyRangeAcrossStores() {
         final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
         stubProviderTwo.addStore(storeName, secondUnderlying);
@@ -436,15 +528,4 @@ public class CompositeReadOnlyWindowStoreTest {
     public void shouldThrowNPEIfKeyIsNull() {
         assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(0), ofEpochMilli(0)));
     }
-
-    @Test
-    public void shouldThrowNPEIfFromKeyIsNull() {
-        assertThrows(NullPointerException.class, () -> windowStore.fetch(null, "a", ofEpochMilli(0), ofEpochMilli(0)));
-    }
-
-    @Test
-    public void shouldThrowNPEIfToKeyIsNull() {
-        assertThrows(NullPointerException.class, () -> windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(0)));
-    }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 649e159..ca6a518 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -274,10 +274,19 @@ public class MeteredWindowStoreTest {
     public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
             .andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.fetch(null, null, 1, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
         replay(innerStoreMock);
 
         store.init((StateStoreContext) context, store);
         store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+        store.fetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+        store.fetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+        store.fetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
 
         // it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
         // and the sensor is tested elsewhere
@@ -306,10 +315,19 @@ public class MeteredWindowStoreTest {
     public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
         expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
             .andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.backwardFetch(null, null, 1, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
         replay(innerStoreMock);
 
         store.init((StateStoreContext) context, store);
         store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+        store.backwardFetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+        store.backwardFetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
+        store.backwardFetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
 
         // it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor
         // and the sensor is tested elsewhere
@@ -454,27 +472,6 @@ public class MeteredWindowStoreTest {
         assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L));
     }
 
-    @Test
-    public void shouldThrowNullPointerOnFetchRangeIfFromIsNull() {
-        assertThrows(NullPointerException.class, () -> store.fetch(null, "to", 0L, 1L));
-    }
-
-    @Test
-    public void shouldThrowNullPointerOnFetchRangeIfToIsNull() {
-        assertThrows(NullPointerException.class, () -> store.fetch("from", null, 0L, 1L));
-    }
-
-
-    @Test
-    public void shouldThrowNullPointerOnbackwardFetchRangeIfFromIsNull() {
-        assertThrows(NullPointerException.class, () -> store.backwardFetch(null, "to", 0L, 1L));
-    }
-
-    @Test
-    public void shouldThrowNullPointerOnbackwardFetchRangeIfToIsNull() {
-        assertThrows(NullPointerException.class, () -> store.backwardFetch("from", null, 0L, 1L));
-    }
-
     private KafkaMetric metric(final String name) {
         return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 79d2229..752334d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -266,7 +266,19 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         for (long now = timeFrom.toEpochMilli(); now <= timeTo.toEpochMilli(); now++) {
             final NavigableMap<K, V> kvMap = data.get(now);
             if (kvMap != null) {
-                for (final Entry<K, V> entry : kvMap.subMap(keyFrom, true, keyTo, true).entrySet()) {
+                final NavigableMap<K, V> kvSubMap;
+                if (keyFrom == null && keyFrom == null) {
+                    kvSubMap = kvMap;
+                } else if (keyFrom == null) {
+                    kvSubMap = kvMap.headMap(keyTo, true);
+                } else if (keyTo == null) {
+                    kvSubMap = kvMap.tailMap(keyFrom, true);
+                } else {
+                    // keyFrom != null and KeyTo != null
+                    kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true);
+                }
+
+                for (final Entry<K, V> entry : kvSubMap.entrySet()) {
                     results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
                 }
             }
@@ -297,8 +309,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from,
-                                                          final K to,
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+                                                          final K keyTo,
                                                           final Instant timeFrom,
                                                           final Instant timeTo) throws IllegalArgumentException {
         final long timeFromTs = ApiUtils.validateMillisecondInstant(timeFrom, prepareMillisCheckFailMsgPrefix(timeFrom, "timeFrom"));
@@ -310,7 +322,19 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
         for (long now = timeToTs; now >= timeFromTs; now--) {
             final NavigableMap<K, V> kvMap = data.get(now);
             if (kvMap != null) {
-                for (final Entry<K, V> entry : kvMap.subMap(from, true, to, true).descendingMap().entrySet()) {
+                final NavigableMap<K, V> kvSubMap;
+                if (keyFrom == null && keyFrom == null) {
+                    kvSubMap = kvMap;
+                } else if (keyFrom == null) {
+                    kvSubMap = kvMap.headMap(keyTo, true);
+                } else if (keyTo == null) {
+                    kvSubMap = kvMap.tailMap(keyFrom, true);
+                } else {
+                    // keyFrom != null and KeyTo != null
+                    kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true);
+                }
+
+                for (final Entry<K, V> entry : kvSubMap.descendingMap().entrySet()) {
                     results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index c7e5924..31ce3c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -112,6 +112,34 @@ public class SegmentIteratorTest {
     }
 
     @Test
+    public void shouldIterateOverAllSegmentsWhenNullKeyFromKeyTo() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            null,
+            null,
+            true);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldIterateBackwardOverAllSegments() {
         iterator = new SegmentIterator<>(
             Arrays.asList(segmentTwo, segmentOne).iterator(), //store should pass the segments in the right order
@@ -140,6 +168,34 @@ public class SegmentIteratorTest {
     }
 
     @Test
+    public void shouldIterateBackwardOverAllSegmentsWhenNullKeyFromKeyTo() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentTwo, segmentOne).iterator(), //store should pass the segments in the right order
+            hasNextCondition,
+            null,
+            null,
+            false);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
         iterator = new SegmentIterator<>(
             Collections.singletonList(segmentOne).iterator(),
@@ -174,6 +230,47 @@ public class SegmentIteratorTest {
     }
 
     @Test
+    public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyFrom() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            null,
+            Bytes.wrap("b".getBytes()),
+            false);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyTo() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("c".getBytes()),
+            null,
+            false);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldOnlyIterateOverSegmentsInRange() {
         iterator = new SegmentIterator<>(
             Arrays.asList(segmentOne, segmentTwo).iterator(),
@@ -194,6 +291,54 @@ public class SegmentIteratorTest {
     }
 
     @Test
+    public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyFrom() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            null,
+            Bytes.wrap("c".getBytes()),
+            true);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyTo() {
+        iterator = new SegmentIterator<>(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("b".getBytes()),
+            null,
+            true);
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() {
         iterator = new SegmentIterator<>(
             Arrays.asList(segmentOne, segmentTwo).iterator(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
index 833ab6a..a429256 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -39,6 +40,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -54,12 +56,15 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 @RunWith(Parameterized.class)
 public class WindowStoreFetchTest {
@@ -77,6 +82,14 @@ public class WindowStoreFetchTest {
     private LinkedList<KeyValue<Windowed<String>, Long>> expectedRecords;
     private LinkedList<KeyValue<String, String>> records;
     private Properties streamsConfig;
+    private String low;
+    private String high;
+    private String middle;
+    private String innerLow;
+    private String innerHigh;
+    private String innerLowBetween;
+    private String innerHighBetween;
+    private String storeName;
 
     private TimeWindowedKStream<String, String> windowedStream;
 
@@ -98,7 +111,31 @@ public class WindowStoreFetchTest {
             // expected the count of each key is 2
             final long windowStartTime = i < m ? 0 : WINDOW_SIZE;
             expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L));
+            high = key;
+            if (low == null) {
+                low = key;
+            }
+            if (i == m) {
+                middle = key;
+            }
+            if (i == 1) {
+                innerLow = key;
+                final int index = i * 2 - 1;
+                innerLowBetween = "key-" + index;
+            }
+            if (i == DATA_SIZE - 2) {
+                innerHigh = key;
+                final int index = i * 2 + 1;
+                innerHighBetween = "key-" + index;
+            }
         }
+        Assert.assertNotNull(low);
+        Assert.assertNotNull(high);
+        Assert.assertNotNull(middle);
+        Assert.assertNotNull(innerLow);
+        Assert.assertNotNull(innerHigh);
+        Assert.assertNotNull(innerLowBetween);
+        Assert.assertNotNull(innerHighBetween);
     }
 
     @Rule
@@ -161,6 +198,50 @@ public class WindowStoreFetchTest {
 
                 TestUtils.checkEquals(scanIterator, dataIterator);
             }
+
+            try (final KeyValueIterator<Windowed<String>, Long> scanIterator = forward ?
+                stateStore.fetch(null, null, 0, Long.MAX_VALUE) :
+                stateStore.backwardFetch(null, null, 0, Long.MAX_VALUE)) {
+
+                final Iterator<KeyValue<Windowed<String>, Long>> dataIterator = forward ?
+                    expectedRecords.iterator() :
+                    expectedRecords.descendingIterator();
+
+                TestUtils.checkEquals(scanIterator, dataIterator);
+            }
+
+            testRange("range", stateStore, innerLow, innerHigh, forward);
+            testRange("until", stateStore, null, middle, forward);
+            testRange("from", stateStore, middle, null, forward);
+
+            testRange("untilBetween", stateStore, null, innerHighBetween, forward);
+            testRange("fromBetween", stateStore, innerLowBetween, null, forward);
+        }
+    }
+
+    private List<KeyValue<Windowed<String>, Long>> filterList(final KeyValueIterator<Windowed<String>, Long> iterator, final String from, final String to) {
+        final Predicate<KeyValue<Windowed<String>, Long>> pred = new Predicate<KeyValue<Windowed<String>, Long>>() {
+            @Override
+            public boolean test(final KeyValue<Windowed<String>, Long> elem) {
+                if (from != null && elem.key.key().compareTo(from) < 0) {
+                    return false;
+                }
+                if (to != null && elem.key.key().compareTo(to) > 0) {
+                    return false;
+                }
+                return elem != null;
+            }
+        };
+
+        return Utils.toList(iterator, pred);
+    }
+
+    private void testRange(final String name, final WindowStore<String, Long> store, final String from, final String to, final boolean forward) {
+        try (final KeyValueIterator<Windowed<String>, Long> resultIterator = forward ? store.fetch(from, to, 0, Long.MAX_VALUE) : store.backwardFetch(from, to, 0, Long.MAX_VALUE);
+             final KeyValueIterator<Windowed<String>, Long> expectedIterator = forward ? store.fetchAll(0, Long.MAX_VALUE) : store.backwardFetchAll(0, Long.MAX_VALUE)) {
+            final List<KeyValue<Windowed<String>, Long>> result = Utils.toList(resultIterator);
+            final List<KeyValue<Windowed<String>, Long>> expected = filterList(expectedIterator, from, to);
+            assertThat(result, is(expected));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index c4c0235..9008982 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -46,6 +47,7 @@ import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
 
 public final class StreamsTestUtils {
     private StreamsTestUtils() {}
@@ -170,6 +172,24 @@ public final class StreamsTestUtils {
         }
     }
 
+    public static void verifyAllWindowedKeyValues(final KeyValueIterator<Windowed<Bytes>, byte[]> iterator,
+                                                  final List<Windowed<Bytes>> expectedKeys,
+                                                  final List<String> expectedValues) {
+        if (expectedKeys.size() != expectedValues.size()) {
+            throw new IllegalArgumentException("expectedKeys and expectedValues should have the same size. " +
+                "expectedKeys size: " + expectedKeys.size() + ", expectedValues size: " + expectedValues.size());
+        }
+
+        for (int i = 0; i < expectedKeys.size(); i++) {
+            verifyWindowedKeyValue(
+                iterator.next(),
+                expectedKeys.get(i),
+                expectedValues.get(i)
+            );
+        }
+        assertFalse(iterator.hasNext());
+    }
+
     public static void verifyWindowedKeyValue(final KeyValue<Windowed<Bytes>, byte[]> actual,
                                               final Windowed<Bytes> expectedKey,
                                               final String expectedValue) {