You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/09/11 21:39:27 UTC

[kafka] branch trunk updated: Adding reverse iterator usage for sliding windows processing (extending KIP-450) (#9239)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2194ccb  Adding reverse iterator usage for sliding windows processing (extending KIP-450) (#9239)
2194ccb is described below

commit 2194ccba5b95657ff7d6a4739edc98cf70f4472b
Author: leah <lt...@confluent.io>
AuthorDate: Fri Sep 11 16:38:17 2020 -0500

    Adding reverse iterator usage for sliding windows processing (extending KIP-450) (#9239)
    
    Add a backwardFetch call to the window store for sliding window
    processing. While the implementation works with the forward call
    to the window store, using backwardFetch allows for the iterator
    to be closed earlier, making implementation more efficient.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <vv...@apache.org>
---
 checkstyle/suppressions.xml                        |   3 +
 .../internals/KStreamSlidingWindowAggregate.java   | 156 +++++--
 .../state/internals/InMemoryWindowStore.java       |  10 +-
 .../kstream/internals/KGroupedStreamImplTest.java  | 121 ++++--
 .../KStreamSlidingWindowAggregateTest.java         | 480 +++++++++++++--------
 .../SlidingWindowedCogroupedKStreamImplTest.java   |  55 ++-
 .../kstream/internals/SuppressScenarioTest.java    |  98 +++--
 7 files changed, 605 insertions(+), 318 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cbb5105..9222a7d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -203,6 +203,9 @@
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
+    <suppress checks="MethodLength"
+              files="KStreamSlidingWindowAggregateTest.java"/>
+
     <!-- Streams test-utils -->
     <suppress checks="ClassFanOutComplexity"
               files="TopologyTestDriver.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index b013de3..250ea02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -83,6 +83,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
         private Sensor lateRecordDropSensor;
         private Sensor droppedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private Boolean reverseIteratorPossible = null;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -150,7 +151,22 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
                 return;
             }
 
-            processInOrder(key, value, inputRecordTimestamp, closeTime);
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                    log.debug("Sliding Windows aggregate using a reverse iterator");
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;
+                    log.debug("Sliding Windows aggregate using a forward iterator");
+                }
+            }
+
+            if (reverseIteratorPossible) {
+                processReverse(key, value, inputRecordTimestamp, closeTime);
+            } else {
+                processInOrder(key, value, inputRecordTimestamp, closeTime);
+            }
         }
 
         public void processInOrder(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
@@ -172,7 +188,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
                     key,
                     key,
                     Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
-                    // to catch the current record's right window, if it exists, without more calls to the store
+                    // add 1 to upper bound to catch the current record's right window, if it exists, without more calls to the store
                     inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
@@ -205,32 +221,66 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // add 1 to upper bound to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    } else if (endTime > inputRecordTimestamp) {
+                        if (rightWinAgg == null) {
+                            rightWinAgg = windowBeingProcessed.value;
+                        }
+                        updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (endTime == inputRecordTimestamp) {
+                        leftWinAlreadyCreated = true;
+                        updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                        if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        } else {
+                            return;
+                        }
+                    } else if (endTime < inputRecordTimestamp) {
+                        leftWinAgg = windowBeingProcessed.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        log.error(
+                            "Unexpected window with start {} found when processing record at {} in `KStreamSlidingWindowAggregate`.",
+                            startTime, inputRecordTimestamp
+                        );
+                        throw new IllegalStateException("Unexpected window found when processing sliding windows");
+                    }
                 }
-                final TimeWindow window = new TimeWindow(inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp);
-                updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-            }
-            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, inputRecordTimestamp)) {
-                createCurrentRecordRightWindow(inputRecordTimestamp, rightWinAgg, key);
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
         }
 
         /**
@@ -260,7 +310,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
                     key,
                     key,
                     0,
-                    // to catch the current record's right window, if it exists, without more calls to the store
+                    // add 1 to upper bound to catch the current record's right window, if it exists, without more calls to the store
                     inputRecordTimestamp + 1)
             ) {
                 while (iterator.hasNext()) {
@@ -310,9 +360,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
 
             //create the right window for the previous record if the previous record exists and the window hasn't already been created
             if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
-                final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
+                createPreviousRecordRightWindow(previousRecordTimestamp + 1, inputRecordTimestamp, key, value, closeTime);
             }
 
             if (combinedWindow == null) {
@@ -327,6 +375,42 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
 
         }
 
+        private void createWindows(final K key,
+                                   final V value,
+                                   final long inputRecordTimestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   final ValueAndTimestamp<Agg> rightWinAgg,
+                                   final ValueAndTimestamp<Agg> leftWinAgg,
+                                   final boolean leftWinAlreadyCreated,
+                                   final boolean rightWinAlreadyCreated,
+                                   final Long previousRecordTimestamp) {
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (previousRecordRightWindowDoesNotExistAndIsNotEmpty(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
+                    createPreviousRecordRightWindow(previousRightWinStart, inputRecordTimestamp, key, value, closeTime);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+                }
+                final TimeWindow window = new TimeWindow(inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp);
+                updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
+            }
+
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, inputRecordTimestamp)) {
+                createCurrentRecordRightWindow(inputRecordTimestamp, rightWinAgg, key);
+            }
+        }
+
         private void createCurrentRecordRightWindow(final long inputRecordTimestamp,
                                                     final ValueAndTimestamp<Agg> rightWinAgg,
                                                     final K key) {
@@ -342,15 +426,25 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
                 rightWinAgg.timestamp());
         }
 
+        private void createPreviousRecordRightWindow(final long windowStart,
+                                                     final long inputRecordTimestamp,
+                                                     final K key,
+                                                     final V value,
+                                                     final long closeTime) {
+            final TimeWindow window = new TimeWindow(windowStart, windowStart + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
+        }
+
         // checks if the previous record falls into the current records left window; if yes, the left window is not empty, otherwise it is empty
         private boolean leftWindowNotEmpty(final Long previousRecordTimestamp, final long inputRecordTimestamp) {
             return previousRecordTimestamp != null && inputRecordTimestamp - windows.timeDifferenceMs() <= previousRecordTimestamp;
         }
 
         // checks if the previous record's right window does not already exist and the current record falls within previous record's right window
-        private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes,
-                                                        final long previousRightWindowStart,
-                                                        final long inputRecordTimestamp) {
+        private boolean previousRecordRightWindowDoesNotExistAndIsNotEmpty(final Set<Long> windowStartTimes,
+                                                                           final long previousRightWindowStart,
+                                                                           final long inputRecordTimestamp) {
             return !windowStartTimes.contains(previousRightWindowStart) && previousRightWindowStart + windows.timeDifferenceMs() >= inputRecordTimestamp;
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index b538edc..49322b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -68,11 +68,11 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
-                        final long retentionPeriod,
-                        final long windowSize,
-                        final boolean retainDuplicates,
-                        final String metricScope) {
+    public InMemoryWindowStore(final String name,
+                               final long retentionPeriod,
+                               final long windowSize,
+                               final boolean retainDuplicates,
+                               final String metricScope) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.windowSize = windowSize;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index a23b98d..00925c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -50,7 +50,9 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Properties;
 
@@ -239,52 +241,81 @@ public class KGroupedStreamImplTest {
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, Long> o) -> o.key().key())
+                .thenComparing((KeyValueTimestamp<Windowed<String>, Long> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+
+        assertThat(actual, equalTo(Arrays.asList(
+            // processing A@500
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
+            // processing A@600
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
+            // processing A@999
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
+            // processing A@600
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
+            // processing first A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
+            // processing second A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
+            // processing A@999
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
+            // processing A@600
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
+            // processing first A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
+            // processing second A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
+            // processing A@600
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
+            // processing first A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
+            // processing second A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
+            // processing first A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
+            // processing second A@1000
+            new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
+
+            // processing B@500
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
+            // processing B@600
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
+            // processing B@700
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
+            // processing first B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
+            // processing second B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
+            // processing B@600
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
+            // processing B@700
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
+            // processing first B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
+            // processing second B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
+            // processing B@700
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
+            // processing first B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
+            // processing second B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
+            // processing first B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
+            // processing second B@1000
+            new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
+
+            // processing C@501
+            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
+            // processing C@600
+            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L),
+            // processing C@600
+            new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L)
         )));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 0a16266..0bb2c01 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -33,11 +32,18 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
@@ -47,9 +53,15 @@ import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.hamcrest.Matcher;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,7 +81,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class KStreamSlidingWindowAggregateTest {
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+            {false},
+            {true}
+        });
+    }
+
+    @Parameterized.Parameter
+    public boolean inOrderIterator;
+
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
 
@@ -79,6 +104,10 @@ public class KStreamSlidingWindowAggregateTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
 
+        final WindowBytesStoreSupplier storeSupplier =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
         final KTable<Windowed<String>, String> table = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -86,7 +115,7 @@ public class KStreamSlidingWindowAggregateTest {
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                Materialized.as(storeSupplier)
             );
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table.toStream().process(supplier);
@@ -131,6 +160,10 @@ public class KStreamSlidingWindowAggregateTest {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
 
         final KTable<Windowed<String>, String> table = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
@@ -138,7 +171,7 @@ public class KStreamSlidingWindowAggregateTest {
             .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
             .reduce(
                 MockReducer.STRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                Materialized.as(storeSupplier)
             );
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table.toStream().process(supplier);
@@ -184,15 +217,20 @@ public class KStreamSlidingWindowAggregateTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
 
+        final WindowBytesStoreSupplier storeSupplier =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
         final KTable<Windowed<String>, String> table2 = builder
-                .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
-                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-                .aggregate(
-                        MockInitializer.STRING_INIT,
-                        MockAggregator.TOSTRING_ADDER,
-                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
-                );
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.as(storeSupplier)
+            );
+
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
 
@@ -216,118 +254,119 @@ public class KStreamSlidingWindowAggregateTest {
             inputTopic1.pipeInput("C", "3", 16L);
             inputTopic1.pipeInput("C", "4", 21);
             inputTopic1.pipeInput("C", "5", 23L);
-
+            
             inputTopic1.pipeInput("D", "4", 11L);
             inputTopic1.pipeInput("D", "2", 12L);
             inputTopic1.pipeInput("D", "3", 29L);
             inputTopic1.pipeInput("D", "5", 16L);
         }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
 
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
         assertEquals(
-                asList(
-                        // FINAL WINDOW: A@10 left window created when A@10 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10),
-                        // A@10 right window created when A@20 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20),
-                        // A@20 left window created when A@20 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20),
-                        // FINAL WINDOW: A@20 right window created when A@22 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22),
-                        // A@22 left window created when A@22 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22),
-                        // FINAL WINDOW: A@20 left window updated when A@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20),
-                        // FINAL WINDOW: A@10 right window updated when A@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20),
-                        // FINAL WINDOW: A@22 left window updated when A@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22),
-                        // FINAL WINDOW: A@15 left window created when A@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15),
-                        // FINAL WINDOW: A@15 right window created when A@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22),
-
-                        // FINAL WINDOW: B@12 left window created when B@12 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12),
-                        // B@12 right window created when B@13 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13),
-                        // FINAL WINDOW: B@13 left window created when B@13 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13),
-                        // B@12 right window updated when B@18 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18),
-                        // B@13 right window created when B@18 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18),
-                        // B@18 left window created when B@18 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18),
-                        // B@12 right window updated when B@19 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19),
-                        // B@13 right window updated when B@19 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19),
-                        // B@18 right window created when B@19 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19),
-                        // B@19 left window created when B@19 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19),
-                        // FINAL WINDOW: B@18 right window updated when B@25 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25),
-                        // FINAL WINDOW: B@19 right window updated when B@25 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25),
-                        // FINAL WINDOW: B@25 left window created when B@25 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25),
-                        // FINAL WINDOW: B@18 left window updated when B@14 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18),
-                        // FINAL WINDOW: B@19 left window updated when B@14 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
-                        // FINAL WINDOW: B@12 right window updated when B@14 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19),
-                        // FINAL WINDOW: B@13 right window updated when B@14 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19),
-                        // FINAL WINDOW: B@14 left window created when B@14 processed
-                        new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14),
-
-                        // FINAL WINDOW: C@11 left window created when C@11 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11),
-                        // C@11 right window created when C@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15),
-                        // FINAL WINDOW: C@15 left window created when C@15 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15),
-                        // C@11 right window updated when C@16 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16),
-                        // C@15 right window created when C@16 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16),
-                        // FINAL WINDOW: C@16 left window created when C@16 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16),
-                        // FINAL WINDOW: C@11 right window updated when C@21 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21),
-                        // C@15 right window updated when C@21 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21),
-                        // C@16 right window created when C@21 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21),
-                        // FINAL WINDOW: C@21 left window created when C@21 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21),
-                        // FINAL WINDOW: C@15 right window updated when C@23 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23),
-                        // FINAL WINDOW: C@16 right window updated when C@23 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23),
-                        // FINAL WINDOW: C@21 right window created when C@23 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23),
-                        // FINAL WINDOW: C@23 left window created when C@23 processed
-                        new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23),
-
-                        // FINAL WINDOW: D@11 left window created when D@11 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11),
-                        // D@11 right window created when D@12 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12),
-                        // FINAL WINDOW: D@12 left window created when D@12 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12),
-                        // FINAL WINDOW: D@29 left window created when D@29 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29),
-                        // FINAL WINDOW: D@11 right window updated when D@16 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16),
-                        // FINAL WINDOW: D@12 right window created when D@16 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16),
-                        // FINAL WINDOW: D@16 left window created when D@16 processed
-                        new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16)
-                        ),
-                supplier.theCapturedProcessor().processed()
+            asList(
+                // FINAL WINDOW: A@10 left window created when A@10 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+1", 10),
+                // FINAL WINDOW: A@15 left window created when A@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+4", 15),
+                // A@20 left window created when A@20 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2", 20),
+                // FINAL WINDOW: A@20 left window updated when A@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10, 20)), "0+1+2+4", 20),
+                // A@10 right window created when A@20 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2", 20),
+                // FINAL WINDOW: A@10 right window updated when A@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(11, 21)), "0+2+4", 20),
+                // A@22 left window created when A@22 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3", 22),
+                // FINAL WINDOW: A@22 left window updated when A@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(12, 22)), "0+2+3+4", 22),
+                // FINAL WINDOW: A@15 right window created when A@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(16, 26)), "0+2+3", 22),
+                // FINAL WINDOW: A@20 right window created when A@22 processed
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(21, 31)), "0+3", 22),
+                // FINAL WINDOW: B@12 left window created when B@12 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(2, 12)), "0+1", 12),
+                // FINAL WINDOW: B@13 left window created when B@13 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(3, 13)), "0+1+2", 13),
+                // FINAL WINDOW: B@14 left window created when B@14 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(4, 14)), "0+1+2+6", 14),
+                // B@18 left window created when B@18 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3", 18),
+                // FINAL WINDOW: B@18 left window updated when B@14 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(8, 18)), "0+1+2+3+6", 18),
+                // B@19 left window created when B@19 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4", 19),
+                // FINAL WINDOW: B@19 left window updated when B@14 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(9, 19)), "0+1+2+3+4+6", 19),
+                // B@12 right window created when B@13 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2", 13),
+                // B@12 right window updated when B@18 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3", 18),
+                // B@12 right window updated when B@19 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4", 19),
+                // FINAL WINDOW: B@12 right window updated when B@14 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(13, 23)), "0+2+3+4+6", 19),
+                // B@13 right window created when B@18 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3", 18),
+                // B@13 right window updated when B@19 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4", 19),
+                // FINAL WINDOW: B@13 right window updated when B@14 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(14, 24)), "0+3+4+6", 19),
+                // FINAL WINDOW: B@25 left window created when B@25 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15, 25)), "0+3+4+5", 25),
+                // B@18 right window created when B@19 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4", 19),
+                // FINAL WINDOW: B@18 right window updated when B@25 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(19, 29)), "0+4+5", 25),
+                // FINAL WINDOW: B@19 right window updated when B@25 processed
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(20, 30)), "0+5", 25),
+                // FINAL WINDOW: C@11 left window created when C@11 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(1, 11)), "0+1", 11),
+                // FINAL WINDOW: C@15 left window created when C@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+1+2", 15),
+                // FINAL WINDOW: C@16 left window created when C@16 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(6, 16)), "0+1+2+3", 16),
+                // FINAL WINDOW: C@21 left window created when C@21 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(11, 21)), "0+1+2+3+4", 21),
+                // C@11 right window created when C@15 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2", 15),
+                // C@11 right window updated when C@16 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3", 16),
+                // FINAL WINDOW: C@11 right window updated when C@21 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(12, 22)), "0+2+3+4", 21),
+                // FINAL WINDOW: C@23 left window created when C@23 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(13, 23)), "0+2+3+4+5", 23),
+                // C@15 right window created when C@16 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3", 16),
+                // C@15 right window updated when C@21 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4", 21),
+                // FINAL WINDOW: C@15 right window updated when C@23 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(16, 26)), "0+3+4+5", 23),
+                // C@16 right window created when C@21 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4", 21),
+                // FINAL WINDOW: C@16 right window updated when C@23 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(17, 27)), "0+4+5", 23),
+                // FINAL WINDOW: C@21 right window created when C@23 processed
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(22, 32)), "0+5", 23),
+                // FINAL WINDOW: D@11 left window created when D@11 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(1, 11)), "0+4", 11),
+                // FINAL WINDOW: D@12 left window created when D@12 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(2, 12)), "0+4+2", 12),
+                // FINAL WINDOW: D@16 left window created when D@16 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(6, 16)), "0+4+2+5", 16),
+                // D@11 right window created when D@12 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2", 12),
+                // FINAL WINDOW: D@11 right window updated when D@16 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(12, 22)), "0+2+5", 16),
+                // FINAL WINDOW: D@12 right window created when D@16 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(13, 23)), "0+5", 16),
+                // FINAL WINDOW: D@29 left window created when D@29 processed
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(19, 29)), "0+3", 29)),
+            actual
         );
     }
 
@@ -336,29 +375,37 @@ public class KStreamSlidingWindowAggregateTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
         final String topic2 = "topic2";
+        final WindowBytesStoreSupplier storeSupplier1 =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder1", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse1", Duration.ofMillis(50000), Duration.ofMillis(10), false);
+        final WindowBytesStoreSupplier storeSupplier2 =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder2", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse2", Duration.ofMillis(50000), Duration.ofMillis(10), false);
 
         final KTable<Windowed<String>, String> table1 = builder
-                .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
-                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
-                .aggregate(
-                        MockInitializer.STRING_INIT,
-                        MockAggregator.TOSTRING_ADDER,
-                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
-                );
+            .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.as(storeSupplier1)
+            );
+        final KTable<Windowed<String>, String> table2 = builder
+            .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.as(storeSupplier2)
+            );
 
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
-        final KTable<Windowed<String>, String> table2 = builder
-                .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
-                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
-                .aggregate(
-                        MockInitializer.STRING_INIT,
-                        MockAggregator.TOSTRING_ADDER,
-                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String())
-                );
         table2.toStream().process(supplier);
 
         table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier);
@@ -545,6 +592,10 @@ public class KStreamSlidingWindowAggregateTest {
     public void testEarlyRecordsLargeInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
@@ -553,8 +604,9 @@ public class KStreamSlidingWindowAggregateTest {
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                Materialized.as(storeSupplier)
             );
+
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table2.toStream().process(supplier);
 
@@ -572,52 +624,79 @@ public class KStreamSlidingWindowAggregateTest {
             inputTopic1.pipeInput("E", "8", 2L);
             inputTopic1.pipeInput("E", "9", 15L);
         }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
         assertEquals(
             asList(
                 // E@0
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0),
                 // E@5
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5),
                 // E@6
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6),
                 // E@3
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6),
-                //E@13
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
                 //E@10
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5", 10),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7", 10),
+                //E@2
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7+8", 10),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
+                //E@10
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5", 10),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
                 //E@4
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7", 10),
+                //E@2
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7+8", 10),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
+                //E@4
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
+                //E@4
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13),
+                //E@4
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7", 10),
-                //E@2
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2+5+7+8", 10),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2+5+7+8", 10),
                 //E@15
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
+                //E@15
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
+                //E@15
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
+                //E@15
                 new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15),
-                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)
-            ),
-            supplier.theCapturedProcessor().processed()
-
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)),
+            actual
         );
     }
 
@@ -648,18 +727,21 @@ public class KStreamSlidingWindowAggregateTest {
         final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
 
         final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
         stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90L)))
-                .aggregate(
-                        () -> "",
-                        MockAggregator.toStringInstance("+"),
-                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
-                )
-                .toStream()
-                .map((key, value) -> new KeyValue<>(key.toString(), value))
-                .to("output");
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.as(storeSupplier)
+            )
+            .toStream()
+            .to("output");
 
         props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
 
@@ -695,9 +777,9 @@ public class KStreamSlidingWindowAggregateTest {
                     // left window for k@15
                     "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[15] window=[5,15] expiration=[110] streamTime=[200]"
             ));
-            final TestOutputTopic<String, String> outputTopic =
-                    driver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer());
-            assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>("[k@190/200]", "+100", null, 200L)));
+            final TestOutputTopic<Windowed<String>, String> outputTopic =
+                    driver.createOutputTopic("output", new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), new StringDeserializer());
+            assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>(new Windowed<>("k", new TimeWindow(190, 200)), "0+100", null, 200L)));
             assertTrue(outputTopic.isEmpty());
         }
     }
@@ -708,6 +790,10 @@ public class KStreamSlidingWindowAggregateTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
+        final WindowBytesStoreSupplier storeSupplier =
+            inOrderIterator
+                ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)
+                : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);
 
         final KTable<Windowed<String>, String> table = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
@@ -724,7 +810,7 @@ public class KStreamSlidingWindowAggregateTest {
                     aggregate = String.valueOf(ch);
                     return aggregate;
                 },
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())
+                Materialized.as(storeSupplier)
             );
         final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         table.toStream().process(supplier);
@@ -877,4 +963,56 @@ public class KStreamSlidingWindowAggregateTest {
         assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
         assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
     }
-}
\ No newline at end of file
+
+    private static class InOrderMemoryWindowStore extends InMemoryWindowStore {
+        InOrderMemoryWindowStore(final String name,
+                        final long retentionPeriod,
+                        final long windowSize,
+                        final boolean retainDuplicates,
+                        final String metricScope) {
+            super(name, retentionPeriod, windowSize, retainDuplicates, metricScope);
+        }
+
+        @Override
+        public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final long timeFrom, final long timeTo) {
+            throw new UnsupportedOperationException("Backward fetch not supported here");
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom,
+                                                                       final Bytes keyTo,
+                                                                       final long timeFrom,
+                                                                       final long timeTo) {
+            throw new UnsupportedOperationException("Backward fetch not supported here");
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) {
+            throw new UnsupportedOperationException("Backward fetch not supported here");
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+            throw new UnsupportedOperationException("Backward fetch not supported here");
+        }
+    }
+
+    private static class InOrderMemoryWindowStoreSupplier extends InMemoryWindowBytesStoreSupplier {
+
+        InOrderMemoryWindowStoreSupplier(final String name,
+                                         final long retentionPeriod,
+                                         final long windowSize,
+                                         final boolean retainDuplicates) {
+            super(name, retentionPeriod, windowSize, retainDuplicates);
+        }
+
+        @Override
+        public WindowStore<Bytes, byte[]> get() {
+            return new InOrderMemoryWindowStore(name(),
+                retentionPeriod(),
+                windowSize(),
+                retainDuplicates(),
+                metricsScope());
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 41ac960..5c9ccb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -20,9 +20,12 @@ package org.apache.kafka.streams.kstream.internals;
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -145,7 +148,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
     }
 
     @Test
-    public void slidingWindowAggregateTestStreamsTest() {
+    public void slidingWindowAggregateStreamsTest() {
         final KTable<Windowed<String>, String> customers = windowedCogroupedStream.aggregate(
                 MockInitializer.STRING_INIT, Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
@@ -165,26 +168,36 @@ public class SlidingWindowedCogroupedKStreamImplTest {
             testInputTopic.pipeInput("k2", "B", 504);
             testInputTopic.pipeInput("k1", "B", 504);
 
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 500);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 500);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A", 501);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A", 501);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A", 502);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A", 502);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B", 503);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 503);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B", 503);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B", 503);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 503);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B", 503);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+B+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k2", "0+A+A+B+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+B+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+B", 504);
-            assertOutputKeyValueTimestamp(testOutputTopic, "k1", "0+A+A+B+B", 504);
+            final Set<TestRecord<String, String>> results = new HashSet<>();
+            while (!testOutputTopic.isEmpty()) {
+                final TestRecord<Windowed<String>, String> realRecord = testOutputTopic.readRecord();
+                final TestRecord<String, String> nonWindowedRecord = new TestRecord<>(
+                    realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp());
+                results.add(nonWindowedRecord);
+            }
+            final Set<TestRecord<String, String>> expected = new HashSet<>();
+            expected.add(new TestRecord<>("k1", "0+A", null, 500L));
+            expected.add(new TestRecord<>("k2", "0+A", null, 500L));
+            expected.add(new TestRecord<>("k2", "0+A", null, 501L));
+            expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
+            expected.add(new TestRecord<>("k1", "0+A", null, 502L));
+            expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
+            expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
+            expected.add(new TestRecord<>("k1", "0+B", null, 503L));
+            expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
+            expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
+            expected.add(new TestRecord<>("k2", "0+B", null, 503L));
+            expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
+            expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
+            expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
+            expected.add(new TestRecord<>("k2", "0+B", null, 504L));
+            expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
+            expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
+            expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
+            expected.add(new TestRecord<>("k1", "0+B", null, 504L));
+            expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
+
+            assertEquals(expected, results);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index a65c481..c5c06c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
@@ -493,52 +494,59 @@ public class SuppressScenarioTest {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =
+                Comparator.comparing((TestRecord<String, Long> o) -> o.getKey())
+                    .thenComparing((TestRecord<String, Long> o) -> o.timestamp());
+
+            final List<TestRecord<String, Long>> actual = drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER);
+            actual.sort(comparator);
+            verify(
+                actual,
+                asList(
+                    // right window for k1@10 created when k1@11 is processed
+                    new KeyValueTimestamp<>("[k1@11/16]", 1L, 11L),
+                    // right window for k1@10 updated when k1@13 is processed
+                    new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
+                    // right window for k1@11 created when k1@13 is processed
+                    new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
+                    // left window for k1@24 created when k1@24 is processed
+                    new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L),
+                    // left window for k1@10 created when k1@10 is processed
+                    new KeyValueTimestamp<>("[k1@5/10]", 1L, 10L),
+                    // left window for k1@10 updated when k1@10 is processed
+                    new KeyValueTimestamp<>("[k1@5/10]", 2L, 10L),
+                    // left window for k1@10 updated when k1@10 is processed
+                    new KeyValueTimestamp<>("[k1@5/10]", 3L, 10L),
+                    // left window for k1@10 updated when k1@5 is processed
+                    new KeyValueTimestamp<>("[k1@5/10]", 4L, 10L),
+                    // left window for k1@10 updated when k1@7 is processed
+                    new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
+                    // left window for k1@11 created when k1@11 is processed
+                    new KeyValueTimestamp<>("[k1@6/11]", 2L, 11L),
+                    // left window for k1@11 updated when k1@10 is processed
+                    new KeyValueTimestamp<>("[k1@6/11]", 3L, 11L),
+                    // left window for k1@11 updated when k1@10 is processed
+                    new KeyValueTimestamp<>("[k1@6/11]", 4L, 11L),
+                    // left window for k1@11 updated when k1@7 is processed
+                    new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
+                    // left window for k1@13 created when k1@13 is processed
+                    new KeyValueTimestamp<>("[k1@8/13]", 4L, 13L),
+                    // left window for k1@13 updated when k1@10 is processed
+                    new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
+                    // right window for k1@90 created when k1@90 is processed
+                    new KeyValueTimestamp<>("[k1@85/90]", 1L, 90L)
+                )
+            );
             verify(
-                    drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
-                    asList(
-                            // left window for k1@10 created when k1@10 is processed
-                            new KeyValueTimestamp<>("[k1@5/10]", 1L, 10L),
-                            // right window for k1@10 created when k1@11 is processed
-                            new KeyValueTimestamp<>("[k1@11/16]", 1L, 11L),
-                            // left window for k1@11 created when k1@11 is processed
-                            new KeyValueTimestamp<>("[k1@6/11]", 2L, 11L),
-                            // left window for k1@10 updated when k1@10 is processed
-                            new KeyValueTimestamp<>("[k1@5/10]", 2L, 10L),
-                            // left window for k1@11 updated when k1@10 is processed
-                            new KeyValueTimestamp<>("[k1@6/11]", 3L, 11L),
-                            // right window for k1@10 updated when k1@13 is processed
-                            new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
-                            // right window for k1@11 created when k1@13 is processed
-                            new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
-                            // left window for k1@13 created when k1@13 is processed
-                            new KeyValueTimestamp<>("[k1@8/13]", 4L, 13L),
-                            // left window for k1@10 updated when k1@10 is processed
-                            new KeyValueTimestamp<>("[k1@5/10]", 3L, 10L),
-                            // left window for k1@11 updated when k1@10 is processed
-                            new KeyValueTimestamp<>("[k1@6/11]", 4L, 11L),
-                            // left window for k1@13 updated when k1@10 is processed
-                            new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
-                            // left window for k1@24 created when k1@24 is processed
-                            new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L),
-                            // left window for k1@10 updated when k1@5 is processed
-                            new KeyValueTimestamp<>("[k1@5/10]", 4L, 10L),
-                            // left window for k1@10 updated when k1@7 is processed
-                            new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
-                            // left window for k1@11 updated when k1@7 is processed
-                            new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
-                            new KeyValueTimestamp<>("[k1@85/90]", 1L, 90L)
-                            )
-            );
-            verify(
-                    drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
-                    asList(
-                            new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
-                            new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
-                            new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
-                            new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
-                            new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
-                            new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L)
-                            )
+                drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@5/10]", 5L, 10L),
+                    new KeyValueTimestamp<>("[k1@6/11]", 5L, 11L),
+                    new KeyValueTimestamp<>("[k1@8/13]", 5L, 13L),
+                    new KeyValueTimestamp<>("[k1@11/16]", 2L, 13L),
+                    new KeyValueTimestamp<>("[k1@12/17]", 1L, 13L),
+                    new KeyValueTimestamp<>("[k1@19/24]", 1L, 24L)
+                )
             );
         }
     }