You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/03 22:40:39 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481764391



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) {
                     windowStartTimes.add(next.key.window().start());

Review comment:
       nit: `next` is not a great name; maybe `existingWindow` instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            windowStore.put(
+                key,
+                rightWinAgg,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());
+        }
+
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= previousTimestamp;
+        }
+
+        // previous record's right window does not already exist and current record falls within previous record's right window
+        private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes,
+                                                        final long previousRightWindowStart,
+                                                        final long currentRecordTimestamp) {
+            return !windowStartTimes.contains(previousRightWindowStart) && previousRightWindowStart + windows.timeDifferenceMs() >= currentRecordTimestamp;
         }
 
-        private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) {
-            return window.key.window().end() == window.value.timestamp();
+        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
+            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;

Review comment:
       Why do we need to check the timestamp?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            windowStore.put(
+                key,
+                rightWinAgg,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());
+        }
+
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= previousTimestamp;

Review comment:
       Why do we do this check based on timestamps?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].

Review comment:
       Not sure if I understand the last sentence: `This will always be true for early records, as they all fall within [0, timeDifferenceMs]`?
   
   Also not sure how this comment relates to setting `previousRecordTimestamp` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            windowStore.put(
+                key,
+                rightWinAgg,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());
+        }
+
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= previousTimestamp;
+        }
+
+        // previous record's right window does not already exist and current record falls within previous record's right window
+        private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes,
+                                                        final long previousRightWindowStart,
+                                                        final long currentRecordTimestamp) {

Review comment:
       Same question as above about `currentRecordTimestamp` (It seems best to me, to use the same variable name for the same think throughout all methods.)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                final TimeWindow window = new TimeWindow(previousRecordTimestamp + 1, previousRecordTimestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            windowStore.put(
+                key,
+                rightWinAgg,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());
+        }
+
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) {

Review comment:
       q: Should we rename `currentTimestamp` -> `timestamp` for consistency? (or maybe, rename `timestamp` to `inputRecordTimestamp` all other the place?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();

Review comment:
       nit: flip both lines:
   ```
   final long startTime = next.key.window().start();
   windowStartTimes.add(startTime);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -228,13 +345,8 @@ private void putAndForward(final Window window,
             if (windowEnd > closeTime) {
                 //get aggregate from existing window
                 final Agg oldAgg = getValueOrNull(valueAndTime);
-                final Agg newAgg;
-                // keep old aggregate if adding a right window, else add new record's value
-                if (windowStart == timestamp + 1) {
-                    newAgg = oldAgg;
-                } else {
-                    newAgg = aggregator.apply(key, value, oldAgg);
-                }
+                final Agg newAgg = aggregator.apply(key, value, oldAgg);

Review comment:
       Not sure how this change relates to "early records"?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]

Review comment:
       nit: `Instead, they will fall within the [0, timeDifferenceMs]` -> `Instead, we will put them into the [0, timeDifferenceMs] window as a "workaround",

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);

Review comment:
       nit: should we rename `putAndForward` to `updateWindowAndForward`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {

Review comment:
       Seem this can be a simple `else`? No need to verify the condition? (Maybe it's helpful to add a comment instead?)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs]
+         * window, and we will update or create their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            // A window from [0, timeDifferenceMs] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),

Review comment:
       Given, that we call `processEarly` only if `0 < timestamp < timeDifferenceMs`, we now that `timestamp - 2 * windows.timeDifferenceMs()` would always be negative? Thus, we can just pass in zero here?
   
   If this is correct, we might want to add a check at the beginning of this method:
   ```
   if (timestamp < 0 || timestamp >= timeDifferenceMs) {
     throw new IllegalArgumentException("...");
   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) {
                     windowStartTimes.add(next.key.window().start());
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
                     } else if (endTime == timestamp) {
                         leftWinAlreadyCreated = true;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
                         putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
                     } else if (endTime > timestamp && startTime <= timestamp) {
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {

Review comment:
       Seems we don't need the `if` as this is the only valid case? Adding a comment might be helpful.
   
   If you want to keep the `if` as a sanity check, we should add a final:
   ```
   } else {
     throw new IllegalStateException(...):
   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+                if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) {
                     valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
                 } else {
                     valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 }
                 final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createRightWindow(timestamp, rightWinAgg, key, value, closeTime);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (next.value.timestamp() < timestamp) {
+                            previousRecordTimestamp = next.value.timestamp();
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {

Review comment:
       Not sure if I can follow. If `combinedWindow.value.timestamp() > timestamp` this seems to imply that the current record is out-of-order? Above, you state that for the out-of-order case, the right window would always exist already though? So why do we need this additional check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org