You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/13 21:43:20 UTC

[kafka] branch trunk updated: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order (#11292)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9628c12  KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order (#11292)
9628c12 is described below

commit 9628c1278e5dcd4f6995c89809461d350d9a530a
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Tue Sep 14 05:40:54 2021 +0800

    KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order (#11292)
    
    When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.
    
    Currently, in Window store, we store records in [segments -> [records] ].
    
    For example:
    window size = 500,
    input records:
    
    key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
    key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
    key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
    
    So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
    segments: [0 /* window start */, records], [500, records].
    And the records for window start 0 will be "a" and "b".
    the records for window start 500 will be "c".
    
    Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.
    
    Reviewers: Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../state/internals/InMemoryWindowStore.java       |  10 +-
 .../internals/AbstractWindowBytesStoreTest.java    | 500 +++++++++------------
 .../state/internals/WindowStoreFetchTest.java      | 229 ++++++++++
 3 files changed, 451 insertions(+), 288 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 7c0d21c..ae37542 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -499,10 +499,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
             currentTime = currentSegment.getKey();
 
-            if (allKeys) {
-                return currentSegment.getValue().entrySet().iterator();
+            final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
+                currentSegment.getValue() :
+                currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+
+            if (forward) {
+                return subMap.entrySet().iterator();
             } else {
-                return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator();
+                return subMap.descendingMap().entrySet().iterator();
             }
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 02db7e7..153db97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -80,6 +80,15 @@ public abstract class AbstractWindowBytesStoreTest {
     static final long SEGMENT_INTERVAL = 60_000L;
     static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL;
 
+    final long defaultStartTime = SEGMENT_INTERVAL - 4L;
+
+    final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", defaultStartTime);
+    final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", defaultStartTime + 1);
+    final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", defaultStartTime + 2);
+    final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", defaultStartTime + 2);
+    final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", defaultStartTime + 4);
+    final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", defaultStartTime + 5);
+
     WindowStore<Integer, String> windowStore;
     InternalMockProcessorContext context;
     MockRecordCollector recordCollector;
@@ -119,122 +128,114 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void testRangeAndSinglePointFetch() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        assertEquals("zero", windowStore.fetch(0, startTime));
-        assertEquals("one", windowStore.fetch(1, startTime + 1L));
-        assertEquals("two", windowStore.fetch(2, startTime + 2L));
-        assertEquals("four", windowStore.fetch(4, startTime + 4L));
-        assertEquals("five", windowStore.fetch(5, startTime + 5L));
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             new HashSet<>(Collections.singletonList("zero")),
             valuesToSet(windowStore.fetch(
                 0,
-                ofEpochMilli(startTime + 0 - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0 + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 0 - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0 + WINDOW_SIZE))));
 
-        putSecondBatch(windowStore, startTime, context);
+        putSecondBatch(windowStore, defaultStartTime, context);
 
-        assertEquals("two+1", windowStore.fetch(2, startTime + 3L));
-        assertEquals("two+2", windowStore.fetch(2, startTime + 4L));
-        assertEquals("two+3", windowStore.fetch(2, startTime + 5L));
-        assertEquals("two+4", windowStore.fetch(2, startTime + 6L));
-        assertEquals("two+5", windowStore.fetch(2, startTime + 7L));
-        assertEquals("two+6", windowStore.fetch(2, startTime + 8L));
+        assertEquals("two+1", windowStore.fetch(2, defaultStartTime + 3L));
+        assertEquals("two+2", windowStore.fetch(2, defaultStartTime + 4L));
+        assertEquals("two+3", windowStore.fetch(2, defaultStartTime + 5L));
+        assertEquals("two+4", windowStore.fetch(2, defaultStartTime + 6L));
+        assertEquals("two+5", windowStore.fetch(2, defaultStartTime + 7L));
+        assertEquals("two+6", windowStore.fetch(2, defaultStartTime + 8L));
 
         assertEquals(
             new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime - 2L - WINDOW_SIZE),
-                ofEpochMilli(startTime - 2L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime - 2L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime - 2L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime - 1L - WINDOW_SIZE),
-                ofEpochMilli(startTime - 1L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime - 1L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime - 1L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 1L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2", "two+3")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 2L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 3L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 4L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 5L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 5L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 6L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 6L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 6L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 6L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+2", "two+3", "two+4", "two+5", "two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 7L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 7L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 7L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 7L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 8L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 8L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 8L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 8L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+4", "two+5", "two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 9L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 9L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 9L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 9L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+5", "two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 10L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 10L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 10L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 10L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two+6")),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 11L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 11L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 11L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 11L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
                 2,
-                ofEpochMilli(startTime + 12L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 12L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -244,14 +245,14 @@ public abstract class AbstractWindowBytesStoreTest {
             changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
         }
 
-        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
 
         assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
         assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
         assertEquals(
             Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"),
             entriesByKey.get(2));
-        assertNull(entriesByKey.get(3));
+        assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3));
         assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
         assertNull(entriesByKey.get(6));
@@ -259,42 +260,28 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void shouldGetAll() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
-            asList(zero, one, two, four, five),
+            asList(zero, one, two, three, four, five),
             toList(windowStore.all())
         );
     }
 
     @Test
     public void shouldGetAllNonDeletedRecords() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
         // Add some records
-        windowStore.put(0, "zero", startTime + 0);
-        windowStore.put(1, "one", startTime + 1);
-        windowStore.put(2, "two", startTime + 2);
-        windowStore.put(3, "three", startTime + 3);
-        windowStore.put(4, "four", startTime + 4);
+        windowStore.put(0, "zero", defaultStartTime + 0);
+        windowStore.put(1, "one", defaultStartTime + 1);
+        windowStore.put(2, "two", defaultStartTime + 2);
+        windowStore.put(3, "three", defaultStartTime + 3);
+        windowStore.put(4, "four", defaultStartTime + 4);
 
         // Delete some records
-        windowStore.put(1, null, startTime + 1);
-        windowStore.put(3, null, startTime + 3);
+        windowStore.put(1, null, defaultStartTime + 1);
+        windowStore.put(3, null, defaultStartTime + 3);
 
         // Only non-deleted records should appear in the all() iterator
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-
         assertEquals(
             asList(zero, two, four),
             toList(windowStore.all())
@@ -303,21 +290,15 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void shouldGetAllReturnTimestampOrderedRecords() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
         // Add some records in different order
-        windowStore.put(4, "four", startTime + 4);
-        windowStore.put(0, "zero", startTime + 0);
-        windowStore.put(2, "two", startTime + 2);
-        windowStore.put(3, "three", startTime + 3);
-        windowStore.put(1, "one", startTime + 1);
+        windowStore.put(4, "four", defaultStartTime + 4);
+        windowStore.put(0, "zero", defaultStartTime + 0);
+        windowStore.put(2, "two", defaultStartTime + 2);
+        windowStore.put(3, "three", defaultStartTime + 3);
+        windowStore.put(1, "one", defaultStartTime + 1);
 
         // Only non-deleted records should appear in the all() iterator
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", startTime + 3);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+        final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", defaultStartTime + 3);
 
         assertEquals(
             asList(zero, one, two, three, four),
@@ -327,13 +308,8 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void shouldEarlyClosedIteratorStillGetAllRecords() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        windowStore.put(0, "zero", startTime + 0);
-        windowStore.put(1, "one", startTime + 1);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+        windowStore.put(0, "zero", defaultStartTime + 0);
+        windowStore.put(1, "one", defaultStartTime + 1);
 
         final KeyValueIterator<Windowed<Integer>, String> it = windowStore.all();
         assertEquals(zero, it.next());
@@ -348,302 +324,260 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void shouldGetBackwardAll() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
-            asList(five, four, two, one, zero),
+            asList(five, four, three, two, one, zero),
             toList(windowStore.backwardAll())
         );
     }
 
     @Test
     public void shouldFetchAllInTimeRange() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
-            asList(one, two, four),
-            toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4)))
+            asList(one, two, three, four),
+            toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 4)))
         );
         assertEquals(
-            asList(zero, one, two),
-            toList(windowStore.fetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3)))
+            asList(zero, one, two, three),
+            toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
         );
         assertEquals(
-            asList(one, two, four, five),
-            toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5)))
+            asList(one, two, three, four, five),
+            toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 5)))
         );
     }
 
     @Test
     public void shouldBackwardFetchAllInTimeRange() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
-            asList(four, two, one),
-            toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4)))
+            asList(four, three, two, one),
+            toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 4)))
         );
         assertEquals(
-            asList(two, one, zero),
-            toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3)))
+            asList(three, two, one, zero),
+            toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
         );
         assertEquals(
-            asList(five, four, two, one),
-            toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5)))
+            asList(five, four, three, two, one),
+            toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 5)))
         );
     }
 
     @Test
     public void testFetchRange() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             asList(zero, one),
             toList(windowStore.fetch(
                 0,
                 1,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
             Collections.singletonList(one),
             toList(windowStore.fetch(
                 1,
                 1,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
-            asList(one, two),
+            asList(one, two, three),
             toList(windowStore.fetch(
                 1,
                 3,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
-            asList(zero, one, two),
+            asList(zero, one, two, three),
             toList(windowStore.fetch(
                 0,
                 5,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
-            asList(zero, one, two, four, five),
+            asList(zero, one, two, three, four, five),
             toList(windowStore.fetch(
                 0,
                 5,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
         );
         assertEquals(
-            asList(two, four, five),
+            asList(two, three, four, five),
             toList(windowStore.fetch(
                 0,
                 5,
-                ofEpochMilli(startTime + 2L),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+                ofEpochMilli(defaultStartTime + 2L),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
         );
         assertEquals(
             Collections.emptyList(),
             toList(windowStore.fetch(
                 4,
                 5,
-                ofEpochMilli(startTime + 2L),
-                ofEpochMilli(startTime + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 2L),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE)))
         );
         assertEquals(
             Collections.emptyList(),
             toList(windowStore.fetch(
                 0,
                 3,
-                ofEpochMilli(startTime + 3L),
-                ofEpochMilli(startTime + WINDOW_SIZE + 5)))
+                ofEpochMilli(defaultStartTime + 3L),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
         );
     }
 
     @Test
     public void testBackwardFetchRange() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
-
-        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
-        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
-        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
-        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
-        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             asList(one, zero),
             toList(windowStore.backwardFetch(
                 0,
                 1,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
             Collections.singletonList(one),
             toList(windowStore.backwardFetch(
                 1,
                 1,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
-            asList(two, one),
+            asList(three, two, one),
             toList(windowStore.backwardFetch(
                 1,
                 3,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
-            asList(two, one, zero),
+            asList(three, two, one, zero),
             toList(windowStore.backwardFetch(
                 0,
                 5,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))
         );
         assertEquals(
-            asList(five, four, two, one, zero),
+            asList(five, four, three, two, one, zero),
             toList(windowStore.backwardFetch(
                 0,
                 5,
-                ofEpochMilli(startTime + 0L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+                ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
         );
         assertEquals(
-            asList(five, four, two),
+            asList(five, four, three, two),
             toList(windowStore.backwardFetch(
                 0,
                 5,
-                ofEpochMilli(startTime + 2L),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L)))
+                ofEpochMilli(defaultStartTime + 2L),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L)))
         );
         assertEquals(
             Collections.emptyList(),
             toList(windowStore.backwardFetch(
                 4,
                 5,
-                ofEpochMilli(startTime + 2L),
-                ofEpochMilli(startTime + WINDOW_SIZE)))
+                ofEpochMilli(defaultStartTime + 2L),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE)))
         );
         assertEquals(
             Collections.emptyList(),
             toList(windowStore.backwardFetch(
                 0,
                 3,
-                ofEpochMilli(startTime + 3L),
-                ofEpochMilli(startTime + WINDOW_SIZE + 5)))
+                ofEpochMilli(defaultStartTime + 3L),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5)))
         );
     }
 
     @Test
     public void testPutAndFetchBefore() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             new HashSet<>(Collections.singletonList("zero")),
-            valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime + 0L - WINDOW_SIZE), ofEpochMilli(startTime + 0L))));
+            valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 0L))));
         assertEquals(
             new HashSet<>(Collections.singletonList("one")),
-            valuesToSet(windowStore.fetch(1, ofEpochMilli(startTime + 1L - WINDOW_SIZE), ofEpochMilli(startTime + 1L))));
+            valuesToSet(windowStore.fetch(1, ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 1L))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L - WINDOW_SIZE), ofEpochMilli(startTime + 2L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 2L))));
         assertEquals(
-            new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(3, ofEpochMilli(startTime + 3L - WINDOW_SIZE), ofEpochMilli(startTime + 3L))));
+            new HashSet<>(Collections.singletonList("three")),
+            valuesToSet(windowStore.fetch(3, ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 3L))));
         assertEquals(
             new HashSet<>(Collections.singletonList("four")),
-            valuesToSet(windowStore.fetch(4, ofEpochMilli(startTime + 4L - WINDOW_SIZE), ofEpochMilli(startTime + 4L))));
+            valuesToSet(windowStore.fetch(4, ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L))));
         assertEquals(
             new HashSet<>(Collections.singletonList("five")),
-            valuesToSet(windowStore.fetch(5, ofEpochMilli(startTime + 5L - WINDOW_SIZE), ofEpochMilli(startTime + 5L))));
+            valuesToSet(windowStore.fetch(5, ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 5L))));
 
-        putSecondBatch(windowStore, startTime, context);
+        putSecondBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 1L - WINDOW_SIZE), ofEpochMilli(startTime - 1L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime - 1L))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 0L - WINDOW_SIZE), ofEpochMilli(startTime + 0L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 0L))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 1L - WINDOW_SIZE), ofEpochMilli(startTime + 1L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 1L))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L - WINDOW_SIZE), ofEpochMilli(startTime + 2L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 2L))));
         assertEquals(
             new HashSet<>(asList("two", "two+1")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 3L - WINDOW_SIZE), ofEpochMilli(startTime + 3L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 3L))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 4L - WINDOW_SIZE), ofEpochMilli(startTime + 4L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2", "two+3")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 5L - WINDOW_SIZE), ofEpochMilli(startTime + 5L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 5L))));
         assertEquals(
             new HashSet<>(asList("two+1", "two+2", "two+3", "two+4")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 6L - WINDOW_SIZE), ofEpochMilli(startTime + 6L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 6L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 6L))));
         assertEquals(
             new HashSet<>(asList("two+2", "two+3", "two+4", "two+5")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 7L - WINDOW_SIZE), ofEpochMilli(startTime + 7L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 7L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 7L))));
         assertEquals(
             new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 8L - WINDOW_SIZE), ofEpochMilli(startTime + 8L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 8L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 8L))));
         assertEquals(
             new HashSet<>(asList("two+4", "two+5", "two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 9L - WINDOW_SIZE), ofEpochMilli(startTime + 9L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 9L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 9L))));
         assertEquals(
             new HashSet<>(asList("two+5", "two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 10L - WINDOW_SIZE), ofEpochMilli(startTime + 10L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 10L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 10L))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 11L - WINDOW_SIZE), ofEpochMilli(startTime + 11L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 11L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 11L))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 12L - WINDOW_SIZE), ofEpochMilli(startTime + 12L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 12L))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 13L - WINDOW_SIZE), ofEpochMilli(startTime + 13L))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 13L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 13L))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -653,11 +587,11 @@ public abstract class AbstractWindowBytesStoreTest {
             changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
         }
 
-        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
         assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
         assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
         assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-        assertNull(entriesByKey.get(3));
+        assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3));
         assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
         assertNull(entriesByKey.get(6));
@@ -665,97 +599,95 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void testPutAndFetchAfter() {
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        putFirstBatch(windowStore, startTime, context);
+        putFirstBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             new HashSet<>(Collections.singletonList("zero")),
-            valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime + 0L),
-                ofEpochMilli(startTime + 0L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime + 0L),
+                ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("one")),
-            valuesToSet(windowStore.fetch(1, ofEpochMilli(startTime + 1L),
-                ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(1, ofEpochMilli(defaultStartTime + 1L),
+                ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L),
-                ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L),
+                ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(3, ofEpochMilli(startTime + 3L),
-                ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(3, ofEpochMilli(defaultStartTime + 3L),
+                ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("four")),
-            valuesToSet(windowStore.fetch(4, ofEpochMilli(startTime + 4L),
-                ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(4, ofEpochMilli(defaultStartTime + 4L),
+                ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("five")),
-            valuesToSet(windowStore.fetch(5, ofEpochMilli(startTime + 5L),
-                ofEpochMilli(startTime + 5L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(5, ofEpochMilli(defaultStartTime + 5L),
+                ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE))));
 
-        putSecondBatch(windowStore, startTime, context);
+        putSecondBatch(windowStore, defaultStartTime, context);
 
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 2L),
-                ofEpochMilli(startTime - 2L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 2L),
+                ofEpochMilli(defaultStartTime - 2L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 1L),
-                ofEpochMilli(startTime - 1L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 1L),
+                ofEpochMilli(defaultStartTime - 1L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1")),
             valuesToSet(windowStore
-                .fetch(2, ofEpochMilli(startTime), ofEpochMilli(startTime + WINDOW_SIZE))));
+                .fetch(2, ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 1L),
-                ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 1L),
+                ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two", "two+1", "two+2", "two+3")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L),
-                ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L),
+                ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+1", "two+2", "two+3", "two+4")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 3L),
-                ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 3L),
+                ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+2", "two+3", "two+4", "two+5")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 4L),
-                ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 4L),
+                ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 5L),
-                ofEpochMilli(startTime + 5L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 5L),
+                ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+4", "two+5", "two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 6L),
-                ofEpochMilli(startTime + 6L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 6L),
+                ofEpochMilli(defaultStartTime + 6L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("two+5", "two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 7L),
-                ofEpochMilli(startTime + 7L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 7L),
+                ofEpochMilli(defaultStartTime + 7L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.singletonList("two+6")),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 8L),
-                ofEpochMilli(startTime + 8L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 8L),
+                ofEpochMilli(defaultStartTime + 8L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 9L),
-                ofEpochMilli(startTime + 9L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 9L),
+                ofEpochMilli(defaultStartTime + 9L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 10L),
-                ofEpochMilli(startTime + 10L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 10L),
+                ofEpochMilli(defaultStartTime + 10L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 11L),
-                ofEpochMilli(startTime + 11L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 11L),
+                ofEpochMilli(defaultStartTime + 11L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 12L),
-                ofEpochMilli(startTime + 12L + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L),
+                ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -765,14 +697,14 @@ public abstract class AbstractWindowBytesStoreTest {
             changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
         }
 
-        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
 
         assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
         assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
         assertEquals(
             Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"),
             entriesByKey.get(2));
-        assertNull(entriesByKey.get(3));
+        assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3));
         assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
         assertNull(entriesByKey.get(6));
@@ -784,49 +716,47 @@ public abstract class AbstractWindowBytesStoreTest {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
         windowStore.init((StateStoreContext) context, windowStore);
 
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        windowStore.put(0, "zero", startTime);
+        windowStore.put(0, "zero", defaultStartTime);
 
         assertEquals(
             new HashSet<>(Collections.singletonList("zero")),
-            valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
+            valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
 
-        windowStore.put(0, "zero", startTime);
-        windowStore.put(0, "zero+", startTime);
-        windowStore.put(0, "zero++", startTime);
+        windowStore.put(0, "zero", defaultStartTime);
+        windowStore.put(0, "zero+", defaultStartTime);
+        windowStore.put(0, "zero++", defaultStartTime);
 
         assertEquals(
             new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
             valuesToSet(windowStore.fetch(
                 0,
-                ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
             valuesToSet(windowStore.fetch(
                 0,
-                ofEpochMilli(startTime + 1L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
             valuesToSet(windowStore.fetch(
                 0,
-                ofEpochMilli(startTime + 2L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
             valuesToSet(windowStore.fetch(
                 0,
-                ofEpochMilli(startTime + 3L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE))));
         assertEquals(
             new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
                 0,
-                ofEpochMilli(startTime + 4L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
+                ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE),
+                ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -836,7 +766,7 @@ public abstract class AbstractWindowBytesStoreTest {
             changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
         }
 
-        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, defaultStartTime);
 
         assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
     }
@@ -903,7 +833,6 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void testDeleteAndUpdate() {
-
         final long currentTime = 0;
         windowStore.put(1, "one", currentTime);
         windowStore.put(1, "one v2", currentTime);
@@ -1176,6 +1105,7 @@ public abstract class AbstractWindowBytesStoreTest {
         store.put(0, "zero", startTime);
         store.put(1, "one", startTime + 1L);
         store.put(2, "two", startTime + 2L);
+        store.put(3, "three", startTime + 2L);
         store.put(4, "four", startTime + 4L);
         store.put(5, "five", startTime + 5L);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
new file mode 100644
index 0000000..833ab6a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+@RunWith(Parameterized.class)
+public class WindowStoreFetchTest {
+    private enum StoreType { InMemory, RocksDB, Timed };
+    private static final String STORE_NAME = "store";
+    private static final int DATA_SIZE = 5;
+    private static final long WINDOW_SIZE = 500L;
+    private static final long RETENTION_MS = 10000L;
+
+    private StoreType storeType;
+    private boolean enableLogging;
+    private boolean enableCaching;
+    private boolean forward;
+
+    private LinkedList<KeyValue<Windowed<String>, Long>> expectedRecords;
+    private LinkedList<KeyValue<String, String>> records;
+    private Properties streamsConfig;
+
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    public WindowStoreFetchTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) {
+        this.storeType = storeType;
+        this.enableLogging = enableLogging;
+        this.enableCaching = enableCaching;
+        this.forward = forward;
+
+        this.records = new LinkedList<>();
+        this.expectedRecords = new LinkedList<>();
+        final int m = DATA_SIZE / 2;
+        for (int i = 0; i < DATA_SIZE; i++) {
+            final String key = "key-" + i * 2;
+            final String value = "val-" + i * 2;
+            final KeyValue<String, String> r = new KeyValue<>(key, value);
+            records.add(r);
+            records.add(r);
+            // expected the count of each key is 2
+            final long windowStartTime = i < m ? 0 : WINDOW_SIZE;
+            expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L));
+        }
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}")
+    public static Collection<Object[]> data() {
+        final List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed);
+        final List<Boolean> logging = Arrays.asList(true, false);
+        final List<Boolean> caching = Arrays.asList(true, false);
+        final List<Boolean> forward = Arrays.asList(true, false);
+        return buildParameters(types, logging, caching, forward);
+    }
+
+    @Before
+    public void setup() {
+        streamsConfig = mkProperties(mkMap(
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+        ));
+    }
+
+    @Test
+    public void testStoreConfig() {
+        final Materialized<String, Long, WindowStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching);
+        //Create topology: table from input topic
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String()));
+        stream.
+            groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
+            .count(stateStoreConfig)
+            .toStream()
+            .to("output");
+
+        final Topology topology = builder.build();
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) {
+            //get input topic and stateStore
+            final TestInputTopic<String, String> input = driver
+                    .createInputTopic("input", new StringSerializer(), new StringSerializer());
+            final WindowStore<String, Long> stateStore = driver.getWindowStore(STORE_NAME);
+
+            //write some data
+            final int medium = DATA_SIZE / 2 * 2;
+            for (int i = 0; i < records.size(); i++) {
+                final KeyValue<String, String> kv = records.get(i);
+                final long windowStartTime = i < medium ? 0 : WINDOW_SIZE;
+                input.pipeInput(kv.key, kv.value, windowStartTime + i);
+            }
+
+            // query the state store
+            try (final KeyValueIterator<Windowed<String>, Long> scanIterator = forward ?
+                stateStore.fetchAll(0, Long.MAX_VALUE) :
+                stateStore.backwardFetchAll(0, Long.MAX_VALUE)) {
+
+                final Iterator<KeyValue<Windowed<String>, Long>> dataIterator = forward ?
+                    expectedRecords.iterator() :
+                    expectedRecords.descendingIterator();
+
+                TestUtils.checkEquals(scanIterator, dataIterator);
+            }
+        }
+    }
+
+    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+        List<Object[]> result = new LinkedList<>();
+        result.add(new Object[0]);
+
+        for (final List<?> argOption : argOptions) {
+            result = times(result, argOption);
+        }
+
+        return result;
+    }
+
+    private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+        final List<Object[]> result = new LinkedList<>();
+        for (final Object[] args : left) {
+            for (final Object rightElem : right) {
+                final Object[] resArgs = new Object[args.length + 1];
+                System.arraycopy(args, 0, resArgs, 0, args.length);
+                resArgs[args.length] = rightElem;
+                result.add(resArgs);
+            }
+        }
+        return result;
+    }
+
+    private Materialized<String, Long, WindowStore<Bytes, byte[]>> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) {
+        final Supplier<WindowBytesStoreSupplier> createStore = () -> {
+            if (type == StoreType.InMemory) {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+                    Duration.ofMillis(WINDOW_SIZE),
+                    false);
+            } else if (type == StoreType.RocksDB) {
+                return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+                    Duration.ofMillis(WINDOW_SIZE),
+                    false);
+            } else if (type == StoreType.Timed) {
+                return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+                    Duration.ofMillis(WINDOW_SIZE),
+                    false);
+            } else {
+                return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS),
+                    Duration.ofMillis(WINDOW_SIZE),
+                    false);
+            }
+        };
+
+        final WindowBytesStoreSupplier stateStoreSupplier = createStore.get();
+        final Materialized<String, Long, WindowStore<Bytes, byte[]>> stateStoreConfig = Materialized
+                .<String, Long>as(stateStoreSupplier)
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.Long());
+        if (cachingEnabled) {
+            stateStoreConfig.withCachingEnabled();
+        } else {
+            stateStoreConfig.withCachingDisabled();
+        }
+        if (loggingEnabled) {
+            stateStoreConfig.withLoggingEnabled(new HashMap<String, String>());
+        } else {
+            stateStoreConfig.withLoggingDisabled();
+        }
+        return stateStoreConfig;
+    }
+}