You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/10/13 01:40:10 UTC

[kafka] branch trunk updated: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess… (#11211)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 07c10024890 KAFKA-12960: Enforcing strict retention time for WindowStore and Sess… (#11211)
07c10024890 is described below

commit 07c10024890c0761ba47ccd4d6301e8101d8c8de
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Thu Oct 13 07:09:58 2022 +0530

    KAFKA-12960: Enforcing strict retention time for WindowStore and Sess… (#11211)
    
    WindowedStore and SessionStore do not implement a strict retention time in general. We should consider to make retention time strict: even if we still have some record in the store (due to the segmented implementation), we might want to filter expired records on-read. This might benefit PAPI users.
    
    This PR, adds the filtering behaviour in the Metered store so that, it gets automatically applied for cases when a custom state store is implemented
    
    Reviewer: Luke Chen <sh...@gmail.com>, A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <mj...@apache.org>
---
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |  47 ++++-
 .../AbstractRocksDBSegmentedBytesStore.java        |  75 ++++++--
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java |  70 +++++---
 .../internals/RocksDBSegmentedBytesStore.java      |   2 +-
 .../RocksDBTimestampedSegmentedBytesStore.java     |   2 +-
 .../TimeWindowedKStreamIntegrationTest.java        |  66 ++++---
 .../KStreamSlidingWindowAggregateTest.java         |  18 +-
 .../internals/KStreamWindowAggregateTest.java      |  61 ++++---
 .../internals/SessionWindowedKStreamImplTest.java  |  63 +++++--
 .../internals/TimeWindowedKStreamImplTest.java     | 121 +++++++++----
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 190 +++++++++++++--------
 .../AbstractRocksDBSegmentedBytesStoreTest.java    | 163 +++++++++---------
 .../internals/AbstractWindowBytesStoreTest.java    |   4 +-
 .../CachingPersistentWindowStoreTest.java          |   3 +-
 .../state/internals/MeteredSessionStoreTest.java   |  49 ++++++
 .../MeteredTimestampedWindowStoreTest.java         |   1 +
 .../state/internals/MeteredWindowStoreTest.java    |  14 ++
 .../state/internals/RocksDBSessionStoreTest.java   |  93 ++++++++++
 .../state/internals/RocksDBWindowStoreTest.java    | 143 ++++++++++++----
 19 files changed, 835 insertions(+), 350 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 95c1d8d8c81..b446a52eb5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -52,6 +52,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
     protected final AbstractSegments<S> segments;
     protected final KeySchema baseKeySchema;
     protected final Optional<KeySchema> indexKeySchema;
+    private final long retentionPeriod;
 
 
     protected ProcessorContext context;
@@ -66,22 +67,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
     AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
                                                  final KeySchema baseKeySchema,
                                                  final Optional<KeySchema> indexKeySchema,
-                                                 final AbstractSegments<S> segments) {
+                                                 final AbstractSegments<S> segments,
+                                                 final long retentionPeriod) {
         this.name = name;
         this.baseKeySchema = baseKeySchema;
         this.indexKeySchema = indexKeySchema;
         this.segments = segments;
+        this.retentionPeriod = retentionPeriod;
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
+
+        final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
         final List<S> searchSpace = segments.allSegments(true);
-        final Bytes from = baseKeySchema.lowerRange(null, 0);
+        final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
         final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
+                baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
                 from,
                 to,
                 true);
@@ -89,13 +95,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
 
     @Override
     public KeyValueIterator<Bytes, byte[]> backwardAll() {
+
+        final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
         final List<S> searchSpace = segments.allSegments(false);
-        final Bytes from = baseKeySchema.lowerRange(null, 0);
+        final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
         final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
+                baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
                 from,
                 to,
                 false);
@@ -119,6 +128,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
 
     abstract protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue);
 
+    // isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's an edge case
+    // when retentionPeriod = grace Period. If we add 1, then actualFrom > to which would
+    // lead to no records being returned.
+    protected long getActualFrom(final long from, final boolean isTimeFirstWindowSchema) {
+        return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime - retentionPeriod) :
+                Math.max(from, observedStreamTime - retentionPeriod + 1);
+
+    }
+
     // For testing
     void putIndex(final Bytes indexKey, final byte[] value) {
         if (!hasIndex()) {
@@ -191,7 +209,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
 
     @Override
     public byte[] get(final Bytes rawKey) {
-        final S segment = segments.getSegmentForTimestamp(baseKeySchema.segmentTimestamp(rawKey));
+        final long timestampFromRawKey = baseKeySchema.segmentTimestamp(rawKey);
+        // check if timestamp is expired
+
+        if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
+            if (timestampFromRawKey < observedStreamTime - retentionPeriod) {
+                LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
+                        rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod);
+                return null;
+            }
+        } else {
+            if (timestampFromRawKey < observedStreamTime - retentionPeriod + 1) {
+                LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
+                        rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod + 1);
+                return null;
+            }
+        }
+
+        final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
         if (segment == null) {
             return null;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 13f914d075a..bcfe30b30e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -52,6 +52,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
     private final String name;
     private final AbstractSegments<S> segments;
     private final String metricScope;
+    private final long retentionPeriod;
     private final KeySchema keySchema;
 
     private ProcessorContext context;
@@ -65,10 +66,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
 
     AbstractRocksDBSegmentedBytesStore(final String name,
                                        final String metricScope,
+                                       final long retentionPeriod,
                                        final KeySchema keySchema,
                                        final AbstractSegments<S> segments) {
         this.name = name;
         this.metricScope = metricScope;
+        this.retentionPeriod = retentionPeriod;
         this.keySchema = keySchema;
         this.segments = segments;
     }
@@ -91,19 +94,30 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
                                           final long from,
                                           final long to,
                                           final boolean forward) {
-        final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
+        final long actualFrom = getActualFrom(from);
 
-        final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
+        if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+            LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", key.toString(), to, actualFrom);
+            return KeyValueIterators.emptyIterator();
+        }
+
+        final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);
+
+        final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, actualFrom);
         final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                keySchema.hasNextCondition(key, key, from, to, forward),
+                keySchema.hasNextCondition(key, key, actualFrom, to, forward),
                 binaryFrom,
                 binaryTo,
                 forward);
     }
 
+    private long getActualFrom(final long from) {
+        return Math.max(from, observedStreamTime - retentionPeriod + 1);
+    }
+
     @Override
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
                                                  final Bytes keyTo,
@@ -133,14 +147,21 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
             return KeyValueIterators.emptyIterator();
         }
 
-        final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
+        final long actualFrom = getActualFrom(from);
 
-        final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
+        if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+            LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", keyFrom, keyTo, to, actualFrom);
+            return KeyValueIterators.emptyIterator();
+        }
+
+        final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);
+
+        final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, actualFrom);
         final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
+                keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
                 binaryFrom,
                 binaryTo,
                 forward);
@@ -148,11 +169,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
 
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
-        final List<S> searchSpace = segments.allSegments(true);
+        final long actualFrom = getActualFrom(0);
+        final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, true);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
+                keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
                 null,
                 null,
                 true);
@@ -160,11 +182,13 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
 
     @Override
     public KeyValueIterator<Bytes, byte[]> backwardAll() {
-        final List<S> searchSpace = segments.allSegments(false);
+        final long actualFrom = getActualFrom(0);
+
+        final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, false);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
+                keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
                 null,
                 null,
                 false);
@@ -173,11 +197,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
     @Override
     public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
                                                     final long timeTo) {
-        final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);
+        final long actualFrom = getActualFrom(timeFrom);
+
+        if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
+            LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
+            return KeyValueIterators.emptyIterator();
+        }
+
+        final List<S> searchSpace = segments.segments(actualFrom, timeTo, true);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                keySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
+                keySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
                 null,
                 null,
                 true);
@@ -186,11 +217,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
     @Override
     public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
                                                             final long timeTo) {
-        final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);
+        final long actualFrom = getActualFrom(timeFrom);
+
+        if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
+            LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
+            return KeyValueIterators.emptyIterator();
+        }
+
+        final List<S> searchSpace = segments.segments(actualFrom, timeTo, false);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                keySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
+                keySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
                 null,
                 null,
                 false);
@@ -234,7 +272,14 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
 
     @Override
     public byte[] get(final Bytes key) {
-        final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        final long timestampFromKey = keySchema.segmentTimestamp(key);
+        // check if timestamp is expired
+        if (timestampFromKey < observedStreamTime - retentionPeriod + 1) {
+            LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
+                    key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
+            return null;
+        }
+        final S segment = segments.getSegmentForTimestamp(timestampFromKey);
         if (segment == null) {
             return null;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index 0398f0ca060..f8217c6d066 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -87,7 +87,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
 
                 cachedValue = get(baseKey);
                 if (cachedValue == null) {
-                    // Key not in base store, inconsistency happened and remove from index.
+                    // Key not in base store or key is expired, inconsistency happened and remove from index.
                     indexIterator.next();
                     AbstractRocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key);
                 } else {
@@ -118,7 +118,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
                                                   final KeySchema baseKeySchema,
                                                   final Optional<KeySchema> indexKeySchema) {
         super(name, baseKeySchema, indexKeySchema,
-            new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+            new KeyValueSegments(name, metricsScope, retention, segmentInterval), retention);
     }
 
     @Override
@@ -141,28 +141,38 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
                                           final long from,
                                           final long to,
                                           final boolean forward) {
+
+        final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+        if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
+            return KeyValueIterators.emptyIterator();
+        }
+
         if (indexKeySchema.isPresent()) {
-            final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, forward);
+            final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+                forward);
 
-            final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from);
+            final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, actualFrom);
             final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to);
 
             return getIndexToBaseStoreIterator(new SegmentIterator<>(
                 searchSpace.iterator(),
-                indexKeySchema.get().hasNextCondition(key, key, from, to, forward),
+                indexKeySchema.get().hasNextCondition(key, key, actualFrom, to, forward),
                 binaryFrom,
                 binaryTo,
                 forward));
         }
 
-        final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, forward);
 
-        final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from);
+        final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+            forward);
+
+        final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, actualFrom);
         final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to);
 
         return new SegmentIterator<>(
             searchSpace.iterator(),
-            baseKeySchema.hasNextCondition(key, key, from, to, forward),
+            baseKeySchema.hasNextCondition(key, key, actualFrom, to, forward),
             binaryFrom,
             binaryTo,
             forward);
@@ -197,30 +207,36 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
             return KeyValueIterators.emptyIterator();
         }
 
+        final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+        if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
+            return KeyValueIterators.emptyIterator();
+        }
+
         if (indexKeySchema.isPresent()) {
-            final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to,
+            final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
                 forward);
 
-            final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, from);
+            final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, actualFrom);
             final Bytes binaryTo = indexKeySchema.get().upperRange(keyTo, to);
 
             return getIndexToBaseStoreIterator(new SegmentIterator<>(
                 searchSpace.iterator(),
-                indexKeySchema.get().hasNextCondition(keyFrom, keyTo, from, to, forward),
+                indexKeySchema.get().hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
                 binaryFrom,
                 binaryTo,
                 forward));
         }
 
-        final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to,
+        final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to,
             forward);
 
-        final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, from);
+        final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom);
         final Bytes binaryTo = baseKeySchema.upperRange(keyTo, to);
 
         return new SegmentIterator<>(
             searchSpace.iterator(),
-            baseKeySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
+            baseKeySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
             binaryFrom,
             binaryTo,
             forward);
@@ -235,13 +251,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
     @Override
     public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
                                                     final long timeTo) {
-        final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo, true);
-        final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
+
+        final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+        if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
+            return KeyValueIterators.emptyIterator();
+        }
+
+        final List<KeyValueSegment> searchSpace = segments.segments(actualFrom, timeTo, true);
+        final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
         final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
+                baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
                 binaryFrom,
                 binaryTo,
                 true);
@@ -250,13 +273,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
     @Override
     public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
                                                             final long timeTo) {
-        final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo, false);
-        final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
+
+        final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+        if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
+            return KeyValueIterators.emptyIterator();
+        }
+
+        final List<KeyValueSegment> searchSpace = segments.segments(actualFrom, timeTo, false);
+        final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
         final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
 
         return new SegmentIterator<>(
                 searchSpace.iterator(),
-                baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
+                baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
                 binaryFrom,
                 binaryTo,
                 false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 6c72fa64c5f..e7b7198d1cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto
                                final long retention,
                                final long segmentInterval,
                                final KeySchema keySchema) {
-        super(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+        super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
index 7fd958c2c27..39f493c761b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen
                                           final long retention,
                                           final long segmentInterval,
                                           final KeySchema keySchema) {
-        super(name, metricsScope, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
+        super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index e0ec6c9bbef..752bbfe4b63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -64,6 +64,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.Collections;
 
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
@@ -195,22 +196,25 @@ public class TimeWindowedKStreamIntegrationTest {
 
         startStreams();
 
+        // on window close
+        // observedStreamTime : 10, retentionPeriod: 10, actualFrom: 0, timeTo: 0, timeFrom: 0
+        // observedStreamTime : 15, retentionPeriod: 10, actualFrom: 5, timeTo: 5, timeFrom: 1
+        // observedStreamTime : 25, retentionPeriod: 10, actualFrom: 15, timeTo: 15, timeFrom: 6
+
         final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
-            new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
-            new StringDeserializer(),
-            10L,
-            String.class,
-            emitFinal ? 6 : 12);
+                new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+                new StringDeserializer(),
+                10L,
+                String.class,
+                emitFinal ? 4 : 12);
 
         final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
         if (emitFinal) {
             expectResult = asList(
-                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
-                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
-                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
-                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
-                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15),
-                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15)
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15)
             );
         } else {
             expectResult = asList(
@@ -260,20 +264,22 @@ public class TimeWindowedKStreamIntegrationTest {
 
         startStreams();
 
+        // on window close
+        // observedStreamTime : 15, retentionPeriod: 15, actualFrom: 0, timeTo: 0, timeFrom: 0
+        // observedStreamTime : 25, retentionPeriod: 15, actualFrom: 10, timeTo: 10, timeFrom: 1
+
         final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
             new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
             new StringDeserializer(),
             10L,
             String.class,
-            emitFinal ? 6 : 13);
+            emitFinal ? 4 : 13);
 
         final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
         if (emitFinal) {
             expectResult = asList(
                 new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
                 new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L, 10L)), "0+4", 6),
-                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
-                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
                 new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
                 new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15)
             );
@@ -342,12 +348,13 @@ public class TimeWindowedKStreamIntegrationTest {
 
         startStreams();
 
+        // ON_WINDOW_CLOSE expires all records.
         List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
             new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
             new StringDeserializer(),
             10L,
             String.class,
-            emitFinal ? 5 : 9);
+            emitFinal ? 4 : 9);
 
         List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
         if (emitFinal) {
@@ -358,8 +365,6 @@ public class TimeWindowedKStreamIntegrationTest {
                     5),
                 new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+L2,R2",
                     11),
-                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)),
-                    "0+L2,R2+L2,R2", 15),
                 new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)),
                     "0+L2,R2", 15)
             );
@@ -403,22 +408,27 @@ public class TimeWindowedKStreamIntegrationTest {
         // Restart
         startStreams();
 
-        windowedMessages = receiveMessagesWithTimestamp(
-            new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
-            new StringDeserializer(),
-            10L,
-            String.class,
-            2);
-
         if (emitFinal) {
-            // Output just new closed window for C
-            expectResult = asList(
-                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)),
-                    "0+L3,R3", 25),
+            windowedMessages = receiveMessagesWithTimestamp(
+                    new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+                    new StringDeserializer(),
+                    10L,
+                    String.class,
+                    1);
+
+            // Output just new/unexpired closed window for C
+            expectResult = Collections.singletonList(
                 new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)),
                     "0+L3,R3", 25)
             );
         } else {
+            windowedMessages = receiveMessagesWithTimestamp(
+                    new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+                    new StringDeserializer(),
+                    10L,
+                    String.class,
+                    2);
+
             expectResult = asList(
                 new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(30L, 40L)),
                     "0+L3,R3", 35),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index df95103791d..796afd38845 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -1650,22 +1650,7 @@ public class KStreamSlidingWindowAggregateTest {
         final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
 
         if (emitFinal) {
-            expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
-            expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
-            expected.put(4L, ValueAndTimestamp.make("ATU", 10L));
-            expected.put(5L, ValueAndTimestamp.make("ABTU", 15L));
-            expected.put(6L, ValueAndTimestamp.make("ABCU", 16L));
-            expected.put(8L, ValueAndTimestamp.make("ABCDU", 18L));
-            expected.put(9L, ValueAndTimestamp.make("ABCD", 18L));
-            expected.put(11L, ValueAndTimestamp.make("BCD", 18L));
-            expected.put(16L, ValueAndTimestamp.make("CD", 18L));
-            expected.put(17L, ValueAndTimestamp.make("D", 18L));
-            expected.put(20L, ValueAndTimestamp.make("E", 30L));
-            expected.put(30L, ValueAndTimestamp.make("EF", 40L));
-            expected.put(31L, ValueAndTimestamp.make("F", 40L));
-            expected.put(45L, ValueAndTimestamp.make("G", 55L));
-            expected.put(46L, ValueAndTimestamp.make("GH", 56L));
-            expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
+            // only non-expired records
             expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
             expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
             expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
@@ -1675,6 +1660,7 @@ public class KStreamSlidingWindowAggregateTest {
             expected.put(66L, ValueAndTimestamp.make("O", 76L));
             expected.put(67L, ValueAndTimestamp.make("OP", 77L));
             expected.put(70L, ValueAndTimestamp.make("OPQ", 80L));
+
         } else {
             expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
             expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 8af320ae705..124bb5593b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -286,11 +286,12 @@ public class KStreamWindowAggregateTest {
         inputTopic1.pipeInput("A", "1", 20L);
 
         processors.get(0).checkAndClearProcessResult(
-            new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 10),
-            new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2", 13),
-            new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14),
-            new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12)
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 10),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2", 13),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12)
         );
+
         processors.get(1).checkAndClearProcessResult();
         processors.get(2).checkAndClearProcessResult();
 
@@ -301,18 +302,24 @@ public class KStreamWindowAggregateTest {
         inputTopic2.pipeInput("A", "a", 15L);
 
         processors.get(0).checkAndClearProcessResult();
-        processors.get(1).checkAndClearProcessResult(
-            new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
-            new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
-            new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2)
-        );
-        processors.get(2).checkAndClearProcessResult(
-            new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),
-                "0+1+1%0+a", 9),
-            new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),
-                "0+2%0+b", 1),
-            new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c",
-                2));
+
+        if (withCache) {
+            processors.get(1).checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2)
+            );
+            processors.get(2).checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),
+                            "0+1+1%0+a", 9),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),
+                            "0+2%0+b", 1),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c",
+                            2));
+        } else {
+            processors.get(0).checkAndClearProcessResult();
+            processors.get(2).checkAndClearProcessResult();
+        }
 
         inputTopic2.pipeInput("A", "a", 5L);
         inputTopic2.pipeInput("B", "b", 6L);
@@ -321,11 +328,23 @@ public class KStreamWindowAggregateTest {
         inputTopic2.pipeInput("A", "a", 21L);
 
         processors.get(0).checkAndClearProcessResult();
-        processors.get(1).checkAndClearProcessResult(
-            new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
-            new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
-            new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
-        );
+        if (withCache) {
+            processors.get(1).checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
+                    new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
+            );
+        } else {
+            processors.get(1).checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
+                    new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2),
+                    new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
+                    new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
+                    new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
+            );
+
+        }
         processors.get(2).checkAndClearProcessResult(
             new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a",
                 10),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 3356b37c408..03ac8ea4041 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -233,12 +233,21 @@ public class SessionWindowedKStreamImplTest {
             processData(driver);
             final SessionStore<String, Long> store = driver.getSessionStore("count-store");
             final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
-            assertThat(
-                data,
-                equalTo(Arrays.asList(
-                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
-                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
-                    KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
+            if (!emitFinal) {
+                assertThat(
+                        data,
+                        equalTo(Arrays.asList(
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+                                KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
+            } else {
+                assertThat(
+                        data,
+                        equalTo(Arrays.asList(
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+                                KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
+
+            }
         }
     }
 
@@ -251,12 +260,21 @@ public class SessionWindowedKStreamImplTest {
             final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced");
             final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
 
-            assertThat(
-                data,
-                equalTo(Arrays.asList(
-                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
-                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
-                    KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
+            if (!emitFinal) {
+                assertThat(
+                        data,
+                        equalTo(Arrays.asList(
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+                                KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
+            } else {
+                assertThat(
+                        data,
+                        equalTo(Arrays.asList(
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+                                KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
+
+            }
         }
     }
 
@@ -272,12 +290,21 @@ public class SessionWindowedKStreamImplTest {
             processData(driver);
             final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated");
             final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
-            assertThat(
-                data,
-                equalTo(Arrays.asList(
-                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
-                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
-                    KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
+            if (!emitFinal) {
+                assertThat(
+                        data,
+                        equalTo(Arrays.asList(
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+                                KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
+            } else {
+                assertThat(
+                        data,
+                        equalTo(Arrays.asList(
+                                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+                                KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
+
+            }
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 5ac43ac8082..6aa4d17dca2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -51,6 +51,7 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.Collections;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -198,6 +199,7 @@ public class TimeWindowedKStreamImplTest {
         }
 
         final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = supplier.theCapturedProcessor().processed();
+
         if (emitFinal) {
             assertEquals(
                 asList(
@@ -238,11 +240,26 @@ public class TimeWindowedKStreamImplTest {
                 final List<KeyValue<Windowed<String>, Long>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
-                assertThat(data, equalTo(asList(
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+                if (withCache) {
+                    // with cache returns all records (expired from underneath as well) as part of
+                    // the merge process
+                    assertThat(data, equalTo(asList(
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+                } else {
+                    // without cache, we get only non-expired record from underlying store.
+                    if (!emitFinal) {
+                        assertThat(data, equalTo(Collections.singletonList(
+                                KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+                    } else {
+                        assertThat(data, equalTo(asList(
+                                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
+                                KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+                    }
+                }
             }
             {
                 final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
@@ -250,11 +267,24 @@ public class TimeWindowedKStreamImplTest {
                 final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
-                assertThat(data, equalTo(asList(
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+                // the same values and logic described above applies here as well.
+                if (withCache) {
+                    assertThat(data, equalTo(asList(
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+                } else {
+                    if (!emitFinal) {
+                        assertThat(data, equalTo(Collections.singletonList(
+                                KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+                    } else {
+                        assertThat(data, equalTo(asList(
+                                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+                                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
+                                KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+                    }
+                }
             }
         }
     }
@@ -274,22 +304,37 @@ public class TimeWindowedKStreamImplTest {
                 final List<KeyValue<Windowed<String>, String>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
-                assertThat(data, equalTo(asList(
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
+                if (withCache) {
+                    // with cache returns all records (expired from underneath as well) as part of
+                    // the merge process
+                    assertThat(data, equalTo(asList(
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
+                } else {
+                    // without cache, we get only non-expired record from underlying store.
+                    // actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501.
+                    // only 1 record is non expired and would be returned.
+                    assertThat(data, equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
+                }
             }
             {
                 final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("reduced");
                 final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
-                assertThat(data, equalTo(asList(
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+                // same logic/data as explained above.
+                if (withCache) {
+                    assertThat(data, equalTo(asList(
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+                } else {
+                    assertThat(data, equalTo(Collections.singletonList(
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+                }
             }
         }
     }
@@ -310,22 +355,36 @@ public class TimeWindowedKStreamImplTest {
                 final List<KeyValue<Windowed<String>, String>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
 
-                assertThat(data, equalTo(asList(
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
+                if (withCache) {
+                    // with cache returns all records (expired from underneath as well) as part of
+                    // the merge process
+                    assertThat(data, equalTo(asList(
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
+                } else {
+                    // without cache, we get only non-expired record from underlying store.
+                    // actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501.
+                    // only 1 record is non expired and would be returned.
+                    assertThat(data, equalTo(Collections
+                            .singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
+                }
             }
             {
                 final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("aggregated");
                 final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
-
-                assertThat(data, equalTo(asList(
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
-                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+                if (withCache) {
+                    assertThat(data, equalTo(asList(
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
+                            KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+                } else {
+                    assertThat(data, equalTo(Collections.singletonList(
+                            KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+                }
             }
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 3644e8eaa6d..8a44b9ab501 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -179,11 +179,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
-            );
+            // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+            // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+            // all records expired as actual from is 59001 and to is 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
 
             assertEquals(expected, toList(values));
         }
@@ -191,11 +190,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
-            );
+            // all records expired as actual from is 59001 and to is 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
 
             assertEquals(expected, toList(values));
         }
@@ -203,20 +199,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
-            );
-
+            // all records expired as actual from is 59001 and to is 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
             assertEquals(expected, toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+            // key B is expired as actual from is 59001
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
                 KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
@@ -226,10 +218,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             null, null, 0, windows[3].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+            // keys A and B expired as actual from is 59001
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
                 KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
@@ -251,10 +241,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
-            );
+            // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+            // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+            // all records expired as actual from is 59001 and to = 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
 
             assertEquals(expected, toList(values));
         }
@@ -262,11 +252,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
-            );
+            // all records expired as actual from is 59001 and to = 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
 
             assertEquals(expected, toList(values));
         }
@@ -274,21 +261,17 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
-            );
-
+            // all records expired as actual from is 59001 and to = 1000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
             assertEquals(expected, toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+            // only 1 record left as actual from is 59001 and to = 60,000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+                    KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
             assertEquals(expected, toList(values));
@@ -297,11 +280,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             null, null, 0, windows[3].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+            // only 1 record left as actual from is 59001 and to = 60,000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+                    KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
             assertEquals(expected, toList(values));
@@ -854,18 +835,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3);
         bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4);
 
+        // Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000.
         final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
             key1, windows[0].start(), windows[0].end());
-        assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1));
+        assertNull(value1);
 
+        // Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000.
         final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
             key1, windows[1].start(), windows[1].end());
-        assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2));
+        assertNull(value2);
 
+        // expired record
+        // timestampFromRawKey = 1500 while observedStreamTime = 60,000 and retention = 1000.
         final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
             key2, windows[2].start(), windows[2].end());
-        assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3));
+        assertNull(value3);
 
+        // only non-expired record
+        // timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and retention = 1000.
         final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
             key3, windows[3].start(), windows[3].end());
         assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
@@ -991,10 +978,26 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
             try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(
                 Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 1, 2000)) {
 
-                final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                    KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                    KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)
-                );
+                final List<KeyValue<Windowed<String>, Long>> expected;
+
+                // actual from: observedStreamTime - retention + 1
+                if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+                    // For windowkeyschema, actual from is 1
+                    // observed stream time = 1000. Retention Period = 1000.
+                    // actual from = (1000 - 1000 + 1)
+                    // and search happens in the range 1-2000
+                    expected = asList(
+                            KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+                            KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)
+                    );
+                } else {
+                    // For session key schema, actual from is 501
+                    // observed stream time = 1500. Retention Period = 1000.
+                    // actual from = (1500 - 1000 + 1)
+                    // and search happens in the range 501-2000
+                    expected = Collections.singletonList(KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L));
+                }
+
                 assertEquals(expected, toList(results));
             }
 
@@ -1010,11 +1013,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
         bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+        // actual from: observedStreamTime - retention + 1
+        // retention = 1000
         try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
-            final List<KeyValue<Windowed<String>, Long>> expected = asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
-            );
+
+            final List<KeyValue<Windowed<String>, Long>> expected;
+
+            // actual from: observedStreamTime - retention + 1
+            if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+                // For windowkeyschema, actual from is 1
+                // observed stream time = 1000. actual from = (1000 - 1000 + 1)
+                // and search happens in the range 1-2000
+                expected = asList(
+                        KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+                        KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+                );
+            } else {
+                // For session key schema, actual from is 501
+                // observed stream time = 1500. actual from = (1500 - 1000 + 1)
+                // and search happens in the range 501-2000 deeming first record as expired.
+                expected = Collections.singletonList(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+            }
 
             assertEquals(expected, toList(results));
         }
@@ -1054,15 +1073,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
 
+        // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+        // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+        // don't return expired records.
         assertEquals(
-            asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
-                KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
-            ),
+            Collections.emptyList(),
             results
         );
 
+        final List<KeyValue<Windowed<String>, Long>> results1 = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 59000, 60000));
+
+        // only non expired record as actual from is 59001
+        assertEquals(
+            Collections.singletonList(
+                KeyValue.pair(new Windowed<>(key, windows[3]), 1000L)
+            ),
+                results1
+        );
+
         segments.close();
     }
 
@@ -1086,9 +1114,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         );
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
+        // actualFrom is computed using observedStreamTime - retention + 1.
+        // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+        // only one record returned as actual from is 59001
         assertEquals(
-            asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L),
+            Collections.singletonList(
                 KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
             ),
             results
@@ -1115,12 +1145,13 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
             ),
             segmentDirs()
         );
-
+        // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+        // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+        // key A expired as actual from is 59,001
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.backwardAll());
         assertEquals(
-            asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L)
+            Collections.singletonList(
+                KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
             ),
             results
         );
@@ -1147,9 +1178,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         );
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
+        // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+        // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+        // only 1 record fetched as actual from is 59001
         assertEquals(
-            asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+            Collections.singletonList(
                 KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
             ),
             results
@@ -1277,9 +1310,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         assertEquals(2, bytesStore.getSegments().size());
 
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
 
+        // after restoration, only 1 record should be returned as actual from is 59001 and the prior record is expired.
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(expected, results);
     }
@@ -1332,10 +1365,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
 
         final String key = "a";
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
 
+        // after restoration, only non expired segments should be returned which is one as actual from is 59001
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(expected, results);
         assertThat(bytesStore.getPosition(), Matchers.notNullValue());
@@ -1368,9 +1400,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         assertEquals(2, bytesStore.getSegments().size());
 
         final String key = "a";
+
+        // only non expired record as actual from is 59001
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -1407,7 +1439,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         assertEquals(1, bytesStore.getSegments().size());
         final String key = "a";
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+
+        // actual from = observedStreamTime - retention + 1.
+        // retention = 1000
+        if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+            // For window stores, observedSteam = 1000 => actualFrom = 1
+            // For session stores, observedSteam = 1500 => actualFrom = 501 which deems
+            // the below record as expired.
+            expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+        }
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(expected, results);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 32e33860ec9..b82f544026e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -171,44 +171,30 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
-            );
-
-            assertEquals(expected, toList(values));
+            // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            assertEquals(Collections.emptyList(), toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
-            );
-
-            assertEquals(expected, toList(values));
+            // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            assertEquals(Collections.emptyList(), toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
-            );
-
-            assertEquals(expected, toList(values));
+            // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            assertEquals(Collections.emptyList(), toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+            // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
                 KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
@@ -217,11 +203,9 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
             null, null, 0, windows[3].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+            // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
                 KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
@@ -242,44 +226,33 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
-            );
-
-            assertEquals(expected, toList(values));
+            // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            assertEquals(Collections.emptyList(), toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
-            );
-
-            assertEquals(expected, toList(values));
+            // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            assertEquals(Collections.emptyList(), toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
 
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
-            );
-
-            assertEquals(expected, toList(values));
+            // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            assertEquals(Collections.emptyList(), toList(values));
         }
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+            // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
             assertEquals(expected, toList(values));
@@ -287,12 +260,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
         try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
             null, null, 0, windows[3].start())) {
-
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
-                KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
-                KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
-                KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+            // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+            // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+            final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+                KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
             );
 
             assertEquals(expected, toList(values));
@@ -306,10 +277,18 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
         bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
         try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
-            final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
-            );
+            final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
+            /*
+            * For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for
+            * SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the
+            * fetch happens from 501-999 while for WindowKeySchema it's from 1-999.
+            */
+            if (schema instanceof SessionKeySchema) {
+                expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+            } else {
+                expected.add(KeyValue.pair(new Windowed<>(key, windows[0]), 10L));
+                expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+            }
 
             assertEquals(expected, toList(results));
         }
@@ -341,16 +320,13 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs());
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
-
+        /*
+        * All records expired as observed stream time = 60,000 which sets actual-from to 59001(60,000 - 1000 + 1). to = 1500.
+         */
         assertEquals(
-            Arrays.asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
-                KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
-            ),
+            Collections.emptyList(),
             results
         );
-
         segments.close();
     }
 
@@ -371,11 +347,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
             ),
             segmentDirs()
         );
-
+        /*
+        * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+         */
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(
-            Arrays.asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+            Collections.singletonList(
                 KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
             ),
             results
@@ -401,11 +378,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
             ),
             segmentDirs()
         );
-
+        /*
+         * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = 60,000.
+         */
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
         assertEquals(
-            Arrays.asList(
-                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+            Collections.singletonList(
                 KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
             ),
             results
@@ -529,8 +507,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         // 2 segments are created during restoration.
         assertEquals(2, bytesStore.getSegments().size());
 
+        /*
+         * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+         */
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -584,9 +564,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         assertEquals(2, bytesStore.getSegments().size());
 
         final String key = "a";
+        /*
+         * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+         */
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -621,9 +602,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         assertEquals(2, bytesStore.getSegments().size());
 
         final String key = "a";
+        /*
+         * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+         */
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -659,11 +641,20 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         // 1 segments are created during restoration.
         assertEquals(1, bytesStore.getSegments().size());
         final String key = "a";
-        final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
-        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
 
+        /*
+         * For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for
+         * SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the
+         * fetch happens from 501 to end while for WindowKeySchema it's from 1 to end.
+         */
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
-        assertEquals(expected, results);
+        if (schema instanceof SessionKeySchema) {
+            assertEquals(Collections.emptyList(), results);
+        } else {
+            final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
+            expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+            assertEquals(expected, results);
+        }
         assertThat(bytesStore.getPosition(), Matchers.notNullValue());
         assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index e93f758c5cf..c69dedc91db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -342,7 +342,7 @@ public abstract class AbstractWindowBytesStoreTest {
         );
         assertEquals(
             asList(zero, one, two, three),
-            toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
+            toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3)))
         );
         assertEquals(
             asList(one, two, three, four, five),
@@ -360,7 +360,7 @@ public abstract class AbstractWindowBytesStoreTest {
         );
         assertEquals(
             asList(three, two, one, zero),
-            toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
+            toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3)))
         );
         assertEquals(
             asList(five, four, three, two, one),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 83136c33e81..72779314e85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -101,7 +101,8 @@ public class CachingPersistentWindowStoreTest {
     @Before
     public void setUp() {
         keySchema = new WindowKeySchema();
-        bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema);
+        ///KAFKA-12960: Adding a retention of 100 ms to make all test cases work as is.
+        bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, keySchema);
         underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE);
         final TimeWindowedDeserializer<String> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
         keyDeserializer.setIsChangelogTopic(true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 922608d4994..044f9484378 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -95,6 +95,7 @@ public class MeteredSessionStoreTest {
     private static final byte[] VALUE_BYTES = VALUE.getBytes();
     private static final long START_TIMESTAMP = 24L;
     private static final long END_TIMESTAMP = 42L;
+    private static final int RETENTION_PERIOD = 100;
 
     private final String threadId = Thread.currentThread().getName();
     private final TaskId taskId = new TaskId(0, 0, "My-Topology");
@@ -429,6 +430,54 @@ public class MeteredSessionStoreTest {
         verify(innerStore);
     }
 
+    @Test
+    public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() {
+        final long systemTime = Time.SYSTEM.milliseconds();
+        expect(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+                .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        init();
+
+        final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+    }
+
+    @Test
+    public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() {
+        final long systemTime = Time.SYSTEM.milliseconds();
+        expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+                .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        init();
+
+        final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+    }
+
+    @Test
+    public void shouldNotFindExpiredSessionRangeFromStore() {
+        final long systemTime = Time.SYSTEM.milliseconds();
+        expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+                .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        init();
+
+        final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+    }
+
+    @Test
+    public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() {
+        final long systemTime = Time.SYSTEM.milliseconds();
+        expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+                .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        init();
+
+        final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+    }
+
     @Test
     public void shouldRecordRestoreTimeOnInit() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 61a1a08975d..fcad32e0063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -66,6 +66,7 @@ public class MeteredTimestampedWindowStoreTest {
         ValueAndTimestamp.make("value", TIMESTAMP);
     private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\0\0\0\0\0\0\0avalue".getBytes();
     private static final int WINDOW_SIZE_MS = 10;
+    private static final int RETENTION_PERIOD = 100;
 
     private InternalMockProcessorContext context;
     private final TaskId taskId = new TaskId(0, 0, "My-Topology");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ca6a518eb4c..20eb5ec88a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -82,6 +83,7 @@ public class MeteredWindowStoreTest {
     private static final String VALUE = "value";
     private static final byte[] VALUE_BYTES = VALUE.getBytes();
     private static final int WINDOW_SIZE_MS = 10;
+    private static final int RETENTION_PERIOD = 100;
     private static final long TIMESTAMP = 42L;
 
     private final String threadId = Thread.currentThread().getName();
@@ -270,6 +272,18 @@ public class MeteredWindowStoreTest {
         verify(innerStoreMock);
     }
 
+    @Test
+    public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD))
+                .andReturn(KeyValueIterators.emptyWindowStoreIterator());
+        replay(innerStoreMock);
+
+        store.init((StateStoreContext) context, store);
+        store.fetch("a", ofEpochMilli(1), ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded on close;
+
+        verify(innerStoreMock);
+    }
+
     @Test
     public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 8a849d86bcb..0f73518b112 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -17,16 +17,34 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.util.Collection;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.Stores;
 
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.jupiter.api.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 @RunWith(Parameterized.class)
 public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
@@ -90,4 +108,79 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
         }
     }
 
+
+    @Test
+    public void shouldNotFetchExpiredSessions() {
+        final long systemTime = Time.SYSTEM.milliseconds();
+        sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - 3 * RETENTION_PERIOD, systemTime - 2 * RETENTION_PERIOD)), 1L);
+        sessionStore.put(new Windowed<>("q", new SessionWindow(systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)), 4L);
+        sessionStore.put(new Windowed<>("r", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 3L);
+        sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 2L);
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                     sessionStore.findSessions("p", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+        ) {
+            assertEquals(mkSet(2L), valuesToSet(iterator));
+        }
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                     sessionStore.backwardFindSessions("p", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
+        ) {
+            assertFalse(iterator.hasNext());
+        }
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                     sessionStore.findSessions("p", "r", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
+        ) {
+            assertFalse(iterator.hasNext());
+        }
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                     sessionStore.findSessions("p", "r", systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)
+        ) {
+            assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+        }
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                     sessionStore.findSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+        ) {
+            assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+        }
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+                     sessionStore.backwardFindSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+        ) {
+            assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+        }
+    }
+
+    @Test
+    public void shouldRemoveExpired() {
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
+
+        // Advance stream time to expire the first record
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * SEGMENT_INTERVAL)), 4L);
+
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+            sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
+        ) {
+            // The 2 records with values 2L and 3L are considered expired as
+            // their end times < observed stream time - retentionPeriod + 1.
+            assertEquals(valuesToSet(iterator), new HashSet<>(Collections.singletonList(4L)));
+        }
+    }
+
+    @Test
+    public void shouldMatchPositionAfterPut() {
+        final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
+        final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
+        final WrappedStateStore rocksDBSessionStore = (WrappedStateStore) changeLoggingSessionBytesStore.wrapped();
+
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
+        sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L);
+        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
+        sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
+        context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
+        sessionStore.put(new Windowed<String>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
+
+        final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L)))));
+        final Position actual = rocksDBSessionStore.getPosition();
+        assertEquals(expected, actual);
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c0c7e963e6e..d2f5289a1ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -187,19 +187,40 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             ),
             segmentDirs(baseDir)
         );
-
+        // For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1.
+        // while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention
+        // expired record
         assertEquals(
-            new HashSet<>(Collections.singletonList("zero")),
+            new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
                 0,
                 ofEpochMilli(startTime - WINDOW_SIZE),
                 ofEpochMilli(startTime + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(Collections.singletonList("one")),
-            valuesToSet(windowStore.fetch(
-                1,
-                ofEpochMilli(startTime + increment - WINDOW_SIZE),
-                ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+        // RocksDbWindwStore =>
+        //  from = 149997
+        //  to = 150003
+        //  actualFrom = 150001
+        // record one timestamp is 150,000 So, it's ignored.
+        // RocksDBTimeOrderedWindowStore*Index =>
+        //  from = 149997
+        //  to = 150003
+        //  actualFrom = 150000, hence not ignored
+        if (storeType == StoreType.RocksDBWindowStore) {
+            assertEquals(
+                new HashSet<>(Collections.emptyList()),
+                valuesToSet(windowStore.fetch(
+                    1,
+                        ofEpochMilli(startTime + increment - WINDOW_SIZE),
+                        ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+
+        } else {
+            assertEquals(
+                new HashSet<>(Collections.singletonList("one")),
+                valuesToSet(windowStore.fetch(
+                        1,
+                        ofEpochMilli(startTime + increment - WINDOW_SIZE),
+                        ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+        }
         assertEquals(
             new HashSet<>(Collections.singletonList("two")),
             valuesToSet(windowStore.fetch(
@@ -247,12 +268,32 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 1,
                 ofEpochMilli(startTime + increment - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(Collections.singletonList("two")),
-            valuesToSet(windowStore.fetch(
-                2,
-                ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
-                ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+        // RocksDbWindwStore =>
+        //  from = 179997
+        //  to = 180003
+        //  actualFrom = 170001
+        // record one timestamp is 180,000 So, it's ignored.
+        // RocksDBTimeOrderedWindowStore*Index =>
+        //  from = 179997
+        //  to = 180003
+        //  actualFrom = 180000, hence not ignored
+        if (storeType == StoreType.RocksDBWindowStore) {
+            assertEquals(
+                    // expired record
+                    new HashSet<>(Collections.emptyList()),
+                    valuesToSet(windowStore.fetch(
+                            2,
+                            ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
+                            ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+        } else {
+            assertEquals(
+                    // expired record
+                    new HashSet<>(Collections.singletonList("two")),
+                    valuesToSet(windowStore.fetch(
+                            2,
+                            ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
+                            ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+        }
         assertEquals(
             new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
@@ -301,7 +342,8 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 ofEpochMilli(startTime + increment - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment + WINDOW_SIZE))));
         assertEquals(
-            new HashSet<>(Collections.singletonList("two")),
+            // expired record
+            new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
                 2,
                 ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
@@ -371,12 +413,24 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 3,
                 ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(Collections.singletonList("four")),
-            valuesToSet(windowStore.fetch(
-                4,
-                ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
-                ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+        if (storeType == StoreType.RocksDBWindowStore) {
+            assertEquals(
+                    // expired record
+                    new HashSet<>(Collections.emptyList()),
+                    valuesToSet(windowStore.fetch(
+                            4,
+                            ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+                            ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+        } else {
+            assertEquals(
+                    // expired record
+                    new HashSet<>(Collections.singletonList("four")),
+                    valuesToSet(windowStore.fetch(
+                            4,
+                            ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+                            ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+
+        }
         assertEquals(
             new HashSet<>(Collections.singletonList("five")),
             valuesToSet(windowStore.fetch(
@@ -465,7 +519,14 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             iter.next();
             fetchedCount++;
         }
-        assertEquals(2, fetchedCount);
+        // 1 extra record is expired in the case of RocksDBWindowStore as
+        // actualFrom = observedStreamTime - retentionPeriod + 1. The +1
+        // isn't present for RocksDbTimeOrderedStoreWith*Index
+        if (storeType == StoreType.RocksDBWindowStore) {
+            assertEquals(1, fetchedCount);
+        } else {
+            assertEquals(2, fetchedCount);
+        }
 
         assertEquals(
             Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
@@ -480,8 +541,10 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             iter.next();
             fetchedCount++;
         }
-        assertEquals(1, fetchedCount);
 
+        // the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in
+        // RocksDbWindowStore shouldn't have an implciation and all stores should return the same fetched counts.
+        assertEquals(1, fetchedCount);
         assertEquals(
             Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
             segmentDirs(baseDir)
@@ -564,6 +627,9 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                                        Serdes.String());
         windowStore.init((StateStoreContext) context, windowStore);
 
+        // For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1.
+        // while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention
+
         assertEquals(
             new HashSet<>(Collections.emptyList()),
             valuesToSet(windowStore.fetch(
@@ -650,12 +716,31 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 3,
                 ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(Collections.singletonList("four")),
-            valuesToSet(windowStore.fetch(
-                4,
-                ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
-                ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+        // RocksDbWindwStore =>
+        //  from = 239,997
+        //  to = 240,003
+        //  actualFrom = 240,001
+        // record four timestamp is 240,000 So, it's ignored.
+        // RocksDBTimeOrderedWindowStore*Index =>
+        //  from = 239,997
+        //  to = 240,003
+        //  actualFrom = 240,000, hence not ignored
+        if (storeType == StoreType.RocksDBWindowStore) {
+            assertEquals(
+                    new HashSet<>(Collections.emptyList()),
+                    valuesToSet(windowStore.fetch(
+                            4,
+                            ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+                            ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+        } else {
+            assertEquals(
+                    new HashSet<>(Collections.singletonList("four")),
+                    valuesToSet(windowStore.fetch(
+                            4,
+                            ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+                            ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+
+        }
         assertEquals(
             new HashSet<>(Collections.singletonList("five")),
             valuesToSet(windowStore.fetch(