You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/01 16:16:03 UTC

[GitHub] [kafka] lct45 opened a new pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

lct45 opened a new pull request #9239:
URL: https://github.com/apache/kafka/pull/9239


   Adding a `backwardFetch` call to the window store for sliding windows processing. While the implementation works with the forward call to the window store, using `backwardFetch` allows for the iterator to be closed earlier, making implementation more efficient.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486753195



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
         assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
         assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
     }
-}
\ No newline at end of file
+
+    private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
       Beautiful. Thanks!

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Ah, I belatedly realized what I think was @ableegoldman's concern. Suppress cares about the timestamps of the records, not the window start times. Since the timestamp of the windowed aggregation results are determined by the input record, not the window start times, all window agg updates that get forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       FWIW, I agree. The result of this aggregation is a KTable, i.e., a changelog stream. The semantics of the changelog stream only state that each record represents the latest state for that record's key. Just like the caches are free to drop arbitrary intermediate updates and KIP-557 is free to drop arbitrary idempotent updates, a processor that forwards updates for multiple different keys should be free to do it in any order.
   
   In fact, I might go so far as to suggest that a proper behavioral test would load all the results into a map so that the test isn't sensitive to meaningless changes like this. But I won't go quite that far because it seems good to to have the opportunity to ask questions like @ableegoldman's. Just to be sure nothing unexpected is happening when we change things later.
   
   I also think it's better for performance not to try and buffer the window results and emit them in "forward" order, since it might be an arbitrarily large number of updates for us to keep in memory.

##########
File path: checkstyle/suppressions.xml
##########
@@ -203,6 +203,9 @@
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
+    <suppress checks="MethodLength"
+              files="KStreamSlidingWindowAggregateTest.java"/>

Review comment:
       It seems like resolving @ableegoldman 's comment ( https://github.com/apache/kafka/pull/9239/files#r486627464  ) would make this unnecessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       It might matter for emit-on-full suppress buffers (if they actually get full), but even then, I think it's equally correct either way, so I don't think we need to be concerned.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder

Review comment:
       Yes, this would be better. Not sure if it helps, but for reference, this is what we did in `org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinMaterializationIntegrationTest#getTopology`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass

Review comment:
       Yeah, it looks pretty straightforward without the comment. If anything would deserve clarification in a comment, it would be a reminder that we do _not_ add the current record to the right-hand window. Then again, it's pretty fundamental to the algorithm.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       The Materialized builder is notoriously vulnerable to "weird type nonsense" because it falls into a gap in Java's type inference system when you use chained methods. Let's see what happens when you implement @ableegoldman 's earlier suggestion.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -68,7 +68,7 @@
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
+    public InMemoryWindowStore(final String name,

Review comment:
       Thanks for the explanation. This sounds fine to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483147852



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record

Review comment:
       Okay I tried to reduce duplicate code, it led to some methods with long variable lists because of not being able to pass by reference. Maybe I'm missing some way to make it cleaner so let me know if it can be improved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485620554



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah it is just sorting the window by start time. With the reverse iterator looking at windows in a different order, the intermediate windows aren't updated in the same order as they are with in-order iterator. I don't think it should matter for suppress because the main list of output in this test is actually the raw output, it looks like the suppressed output didn't change after the ordering




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483310806



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }

Review comment:
       Hm, yeah, that makes sense to me. Nice! I guess if we wanted to do something similar for the forward and early case, we would have to store a boolean. Not sure if it's worth it or not, your call




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483152129



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -608,10 +615,13 @@ public void testAggregateRandomInput() {
 
     private void verifyRandomTestResults(final Map<Long, ValueAndTimestamp<String>> actual) {

Review comment:
       Yeah I think running on both is definitely good to have. Along that line, should benchmark run with both reverse and forward? It could indicate if reverse is actually more efficient or if they run about the same




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-690843020






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487170646



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485266653



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##########
@@ -246,4 +238,31 @@ private void assertOutputKeyValueTimestamp(final TestOutputTopic<Windowed<String
         final TestRecord<String, String> testRecord = new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp);
         assertThat(nonWindowedRecord, equalTo(testRecord));
     }
+
+    private void assertOutputKeyValueNotOrdered(final Set<TestRecord<String, String>> results) {

Review comment:
       This makes it sound like you want to assert that the output is not ordered, which I don't think is the point here?
   
   Also, since you're only calling this from one place and are asserting a specific output that corresponds to a specific test, I would just inline this check in the test instead of moving it out to a new method

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Why this change?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record

Review comment:
       Yeah, that's definitely a pretty long list of input parameters. I don't think that's necessarily a problem, but if you feel it's cleaner to just inline the window creation, then go for it. 
   
   Duplicate code is not the end of the world. The only risk is that we might need to change the window creation logic and only do it in one place but not the other, but that's probably a low risk here. So just make a call 🙂 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Is this change just sorting the output by window start time? Why do that vs. verifying that the output is in a specific order? In general the output order may not matter much but it does seem important to verify for suppress (I think 🤷‍♀️ )

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -161,60 +205,231 @@ public void processInOrder(final K key, final V value, final long timestamp) {
                     windowStartTimes.add(next.key.window().start());
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
-                        if (isLeftWindow(next)) {
-                            latestLeftTypeWindow = next.key.window();
-                        }
+                        // update to store the previous record
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
                     } else if (endTime == timestamp) {
                         leftWinAlreadyCreated = true;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
                         putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
                     } else if (endTime > timestamp && startTime <= timestamp) {
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    } else {
+                        throw new IllegalStateException("Unexpected window found when processing sliding windows");
+                    }
+                }
+            }
+            createWindows(key, value, timestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
+
+        public void processReverse(final K key,
+                                   final V value,
+                                   final long timestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   ValueAndTimestamp<Agg> leftWinAgg,
+                                   ValueAndTimestamp<Agg> rightWinAgg,
+                                   boolean leftWinAlreadyCreated,
+                                   boolean rightWinAlreadyCreated,
+                                   Long previousRecordTimestamp) {
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+                    if (startTime == timestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    } else if (endTime > timestamp) {
+                        if (rightWinAgg == null) {
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        } else {
+                            return;
+                        }
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
                     } else {
+                        throw new IllegalStateException("Unexpected window found when processing sliding windows");
+                    }
+                }
+            }
+            createWindows(key, value, timestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRecordRightWindow(previousRecordTimestamp + 1, timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
+            }
+
+        }
+
+        private void createWindows(final K key,
+                                   final V value,
+                                   final long timestamp,

Review comment:
       nit: rename to `inputRecordTimestamp` to be consistent with the naming in the early records PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-690843020


   It looks like now that PR builds are working again, we can get back to ignoring flaky tests ;)
   
   ```
       Build / JDK 8 / kafka.server.LeaderElectionTest.testLeaderElectionAndEpoch
       Build / JDK 11 / org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
       Build / JDK 11 / kafka.api.DelegationTokenEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
       Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```
   
   None of those were related to this changeset.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485621499



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##########
@@ -246,4 +238,31 @@ private void assertOutputKeyValueTimestamp(final TestOutputTopic<Windowed<String
         final TestRecord<String, String> testRecord = new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp);
         assertThat(nonWindowedRecord, equalTo(testRecord));
     }
+
+    private void assertOutputKeyValueNotOrdered(final Set<TestRecord<String, String>> results) {

Review comment:
       Yeah the name was a struggle, but I just put it all back into the main function so should be cleaner now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486673070



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I based the aggregate function off something else so `topic-Canonized` must be from there. It seems like _generally_, `valueSerde` doesn't need to be set to string so I removed it in most places. Changing to the custom window store made it complain about doing `Serdes.String` (it wanted an object) and it appears that we need a `Serdes.String` for the grace test so I'll mess around with that one more to see how it can be run with both iterators




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
         assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
         assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
     }
-}
\ No newline at end of file
+
+    private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
       Beautiful. Thanks!

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Ah, I belatedly realized what I think was @ableegoldman's concern. Suppress cares about the timestamps of the records, not the window start times. Since the timestamp of the windowed aggregation results are determined by the input record, not the window start times, all window agg updates that get forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       FWIW, I agree. The result of this aggregation is a KTable, i.e., a changelog stream. The semantics of the changelog stream only state that each record represents the latest state for that record's key. Just like the caches are free to drop arbitrary intermediate updates and KIP-557 is free to drop arbitrary idempotent updates, a processor that forwards updates for multiple different keys should be free to do it in any order.
   
   In fact, I might go so far as to suggest that a proper behavioral test would load all the results into a map so that the test isn't sensitive to meaningless changes like this. But I won't go quite that far because it seems good to to have the opportunity to ask questions like @ableegoldman's. Just to be sure nothing unexpected is happening when we change things later.
   
   I also think it's better for performance not to try and buffer the window results and emit them in "forward" order, since it might be an arbitrarily large number of updates for us to keep in memory.

##########
File path: checkstyle/suppressions.xml
##########
@@ -203,6 +203,9 @@
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
+    <suppress checks="MethodLength"
+              files="KStreamSlidingWindowAggregateTest.java"/>

Review comment:
       It seems like resolving @ableegoldman 's comment ( https://github.com/apache/kafka/pull/9239/files#r486627464  ) would make this unnecessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       It might matter for emit-on-full suppress buffers (if they actually get full), but even then, I think it's equally correct either way, so I don't think we need to be concerned.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder

Review comment:
       Yes, this would be better. Not sure if it helps, but for reference, this is what we did in `org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinMaterializationIntegrationTest#getTopology`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass

Review comment:
       Yeah, it looks pretty straightforward without the comment. If anything would deserve clarification in a comment, it would be a reminder that we do _not_ add the current record to the right-hand window. Then again, it's pretty fundamental to the algorithm.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       The Materialized builder is notoriously vulnerable to "weird type nonsense" because it falls into a gap in Java's type inference system when you use chained methods. Let's see what happens when you implement @ableegoldman 's earlier suggestion.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -68,7 +68,7 @@
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
+    public InMemoryWindowStore(final String name,

Review comment:
       Thanks for the explanation. This sounds fine to me.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-691224736


   Oops:
   ```
   17:31:33  [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-9239/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java:215:5: Method length is 153 lines (max allowed is 150). [MethodLength]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487170646



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-689272887


   I think this is ready for @vvcephei to review, once the early records PR is merged and this one is rebased


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r481518121



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +121,28 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding window
-            if (timestamp < windows.timeDifferenceMs()) {
-                log.warn(
-                    "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
-                );
-                droppedRecordsSensor.record();
-                return;
+            Method backwardsIterator = null;
+
+            try {
+                windowStore.getClass().getMethod("backwardFetch", new Class[] { Object.class, Object.class, Instant.class, Instant.class });
+            } catch (NoSuchMethodException | SecurityException e)  { }
+            if (backwardsIterator != null) {
+                processReverse(key, value, timestamp);
+            } else {
+                processInOrder(key, value, timestamp);

Review comment:
       1) I'm not sure I understand the usage of `backwardsIterator` here. Do we ever set it to anything?
   2) I think you're overcomplicating this 🙂  All you need to do is call `windowStore.backwardsFetch(...)` and if the underlying store doesn't support it, then it will throw UnsupportedOperationException. You don't need to use reflection/`getMethod` . Also, if we're ever in a position of catching SecurityException, something has probably gone wrong
   3) Originally I was thinking we should do this in `init` so we don't have to figure out if it's a reverse store every time a new record gets processed. But I just realized that all of the SessionStore fetch methods require a key, so we have to do this in `process` (since we don't have a key to pass in during `init`, and null keys aren't allowed). We can at least just do it once in the first `process`, and then keep track of whether we should use forwards or reverse iteration in subsequent ones
   
   Given the above (especially 3), there's no perfect solution, but one thing we can do is just keep a `reverseIterationPossible` boolean. If it's false we call `processInOrder`, if it's true we call `processReverse`. We also put a `catch UnsupportedOperationException` around the `processReverse` call, so if it does throw on the first invocation of `process` then we can call `processInOrder` and also set `reverseIterationPossible` to false so that we never call `processReverse` again. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485927114



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -150,23 +152,46 @@ public void process(final K key, final V value) {
                 return;
             }
 
-            processInOrder(key, value, inputRecordTimestamp, closeTime);
-        }
-
-        public void processInOrder(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                    log.debug("Sliding Windows aggregate using a reverse iterator");
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;
+                    log.debug("Sliding Windows aggregate using a forward iterator");
+                }
+            }
 
             final Set<Long> windowStartTimes = new HashSet<>();
 
             // aggregate that will go in the current record’s left/right window (if needed)
-            ValueAndTimestamp<Agg> leftWinAgg = null;
-            ValueAndTimestamp<Agg> rightWinAgg = null;
+            final ValueAndTimestamp<Agg> leftWinAgg = null;
+            final ValueAndTimestamp<Agg> rightWinAgg = null;
 
             //if current record's left/right windows already exist
-            boolean leftWinAlreadyCreated = false;
-            boolean rightWinAlreadyCreated = false;
+            final boolean leftWinAlreadyCreated = false;
+            final boolean rightWinAlreadyCreated = false;
 
-            Long previousRecordTimestamp = null;
+            final Long previousRecordTimestamp = null;

Review comment:
       Why create all of these here and then pass the uninitialized values into `processInOrder/processReverse`? If we only need them within the `processX` methods, let's just keep them there

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -184,6 +209,7 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
 
                     if (endTime < inputRecordTimestamp) {
                         leftWinAgg = windowBeingProcessed.value;
+                        // update to store the previous record

Review comment:
       This comment doesn't really add much

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,38 +231,69 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
-
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+        public void processReverse(final K key,
+                                   final V value,
+                                   final long inputRecordTimestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   ValueAndTimestamp<Agg> leftWinAgg,
+                                   ValueAndTimestamp<Agg> rightWinAgg,
+                                   boolean leftWinAlreadyCreated,
+                                   boolean rightWinAlreadyCreated,
+                                   Long previousRecordTimestamp) {
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    } else if (endTime > inputRecordTimestamp) {
+                        if (rightWinAgg == null) {
+                            rightWinAgg = windowBeingProcessed.value;
+                        }
+                        updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                    } else if (endTime == inputRecordTimestamp) {
+                        leftWinAlreadyCreated = true;
+                        updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+                        if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        } else {
+                            return;
+                        }
+                    } else if (endTime < inputRecordTimestamp) {
+                        leftWinAgg = windowBeingProcessed.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        log.error(
+                            "Unexpected window with start {} found when processing record at {} in `KStreamSlidingWindowAggregate`.",
+                            startTime, inputRecordTimestamp
+                        );
+                        throw new IllegalStateException("Unexpected window found when processing sliding windows");
+                    }
                 }
-                final TimeWindow window = new TimeWindow(inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp);
-                updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-            }
-            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, inputRecordTimestamp)) {
-                createCurrentRecordRightWindow(inputRecordTimestamp, rightWinAgg, key);
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
         }
 
         /**
-         * Created to handle records where 0 < inputRecordTimestamp < timeDifferenceMs. These records would create
-         * windows with negative start times, which is not supported. Instead, we will put them into the [0, timeDifferenceMs]
-         * window as a "workaround", and we will update or create their right windows as new records come in later

Review comment:
       I think some things got accidentally changed/reverted during the rebase, eg this paragraph and the comment on line 250 at least

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -194,7 +220,7 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     } else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) {
                         rightWinAgg = windowBeingProcessed.value;
                         updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
-                    } else if (startTime == inputRecordTimestamp + 1) {

Review comment:
       Was this a rebasing accident? Seems like using `startTime` is correct

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -327,6 +382,41 @@ private void processEarly(final K key, final V value, final long inputRecordTime
 
         }
 
+        private void createWindows(final K key,
+                                   final V value,
+                                   final long inputRecordTimestamp,
+                                   final long closeTime,
+                                   final Set<Long> windowStartTimes,
+                                   final ValueAndTimestamp<Agg> rightWinAgg,
+                                   final ValueAndTimestamp<Agg> leftWinAgg,
+                                   final boolean leftWinAlreadyCreated,
+                                   final boolean rightWinAlreadyCreated,
+                                   final Long previousRecordTimestamp) {
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (previousRecordRightWindowDoesNotExistAndIsNotEmpty(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
+                    createPreviousRecordRightWindow(previousRightWinStart, inputRecordTimestamp, key, value, closeTime);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+                }
+                final TimeWindow window = new TimeWindow(inputRecordTimestamp - windows.timeDifferenceMs(), inputRecordTimestamp);
+                updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
+            }

Review comment:
       super nit: put a line break before the right window creation block below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-691322853


   These test failures look unrelated:
   ```
       Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
       Build / JDK 11 / org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing[exactly_once]
       Build / JDK 8 / kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486751006



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483129715



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }

Review comment:
       Do we need a boolean? Or could we just return? If there's a record at the current record's timestamp, all we need to do is update the windows it falls within, and as we go back in time the earliest window it'll fall within is it's left window, so if we find the left window _and_ the left window was created by a record at the same timestamp, we can just return after updating that window, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9239:
URL: https://github.com/apache/kafka/pull/9239


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485623786



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Because we're outputting all the intermediate data, the order of the internal windows aren't the same when you use the reverse iterator versus the in-order iterator. This is mainly an issue for large tests, but when there are a lot of windows to update as a new record comes in, the reverse iterator looks at them from largest start time to smallest start time, and the in-order iterator looks at them from smallest start time to largest start time. I suppose we could alter the algorithm to output results differently, but getting a different order didn't seem too harmful for users. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-691224736






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486619734



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store

Review comment:
       nit: `add 1 to upper bound to catch the current records'...` here and elsewhere, it's not totally clear what this comment is referring to

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass

Review comment:
       nit: maybe you can just remove this comment, the code seems pretty explanatory. We can see what @vvcephei  thinks

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder

Review comment:
       Instead of specifying the whole thing for both cases, you could just create a 
   ```
   final WindowBytesStoreSupplier supplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier(...) : Stores.InMemoryWindowStore(...)
   ```
   and then pass that into the `Materialized` without having to list the whole topology out twice. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       Where does `topic-Canonized` come from? Also, if we need to set the ValueSerde to String here, then wouldn't we need to do so for the in-order case as well? Does that mean we don't actually need the `withValueSerde` thing here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -68,7 +68,7 @@
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
+    public InMemoryWindowStore(final String name,

Review comment:
       Need to make this public for the child class used to parametrize the test so we continue to test both forward and reverse directions. The alternative would be to just create a new standalone `ForwardOnlyWindowStore` test utility class and stick it in the same package
   (btw, need to fix parameter alignment)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9239:
URL: https://github.com/apache/kafka/pull/9239






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487174400



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9239:
URL: https://github.com/apache/kafka/pull/9239


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r481518121



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +121,28 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding window
-            if (timestamp < windows.timeDifferenceMs()) {
-                log.warn(
-                    "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
-                );
-                droppedRecordsSensor.record();
-                return;
+            Method backwardsIterator = null;
+
+            try {
+                windowStore.getClass().getMethod("backwardFetch", new Class[] { Object.class, Object.class, Instant.class, Instant.class });
+            } catch (NoSuchMethodException | SecurityException e)  { }
+            if (backwardsIterator != null) {
+                processReverse(key, value, timestamp);
+            } else {
+                processInOrder(key, value, timestamp);

Review comment:
       1) I'm not sure I understand the usage of `backwardsIterator` here. Do we ever set it to anything?
   2) I think you're overcomplicating this 🙂  All you need to do is call `windowStore.backwardsFetch(...)` and if the underlying store doesn't support it, then it will throw UnsupportedOperationException. You don't need to use reflection/`getMethod` . Also, if we're ever in a position of catching SecurityException, something has probably gone wrong
   3) Originally I was thinking we should do this in `init` so we don't have to figure out if it's a reverse store on every iteration, but I just realized that all of the SessionStore fetch methods require a key, so we have to do this in `process`. We can at least just do it once in the first `process`, and then keep track of whether we should use forwards or reverse iteration in subsequent ones
   
   Given the above (especially 3), there's no perfect solution, but one thing we can do is just keep a `reverseIterationPossible` boolean. If it's false we call `processInOrder`, if it's true we call `processReverse`. We also put a `catch UnsupportedOperationException` around the `processReverse` call, so if it does throw on the first invocation of `process` then we can call `processInOrder` and also set `reverseIterationPossible` to false so that we never call `processReverse` again. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486684145



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       It should ultimately be the same for both iterators, but there might be some weird type nonsense going on. These problems should go away if you go with the approach of just setting a `StoreSupplier` based on `inOrderIterator` and then only specifying the topology once




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r482435367



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {

Review comment:
       Instead of the extra `foundRightWinAgg`  boolean, can we just check if `rightWinAgg` is still equal to `null`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;

Review comment:
       Instead of asserting that this will be true at most once in the comment, we should do so in the code by checking `else if startTime == timestamp + 1` instead of just falling back to `else`. Tbh we should probably do the same for the `processInOrder` case and not make any assumptions (you can add an `else` case that throws `IllegalStateException` then, since every possible case should be covered by one of the above conditions)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record

Review comment:
       Is everything after this point the same for both `processInOrder` and `processReverse`? The only difference between the two is in the iterator loop, right? If so, we should try to reduce duplicate code and only invoke a difference `in-order` vs `reverse` method for the loop 

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -608,10 +615,13 @@ public void testAggregateRandomInput() {
 
     private void verifyRandomTestResults(final Map<Long, ValueAndTimestamp<String>> actual) {

Review comment:
       I think it would be valuable to have all the tests run with both the forward and reverse iterators. You can actually parametrize the test class itself so that it runs multiple times with different input: the syntax is kind of hard to explain (and understand) but you can look at EosBetaUpgradeIntegrationTest as an example. It's parametrized by a `injectFailure` boolean -- you can do the same thing with a `forwardIteration` boolean.
   Then you could force it to run in the forward direction by providing a custom `WindowBytesStoreSupplier` that supplies a custom `WindowStore` implementation where the appropriate `fetch` method throws UnsupportedOperationException. You should be able to just extend one of the existing built-in stores (eg RocksDBWindowStore or InMemoryWindowStore) that just overrides `fetch`. Let me know if you have any questions about how all this would work

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), timestamp));
+            windowStore.put(
+                key,
+                valueAndTime,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());        }
+
+        private void createPreviousRightWindow(final long windowStart,
+                                               final long currentRecordTimestamp,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final TimeWindow window = new TimeWindow(windowStart, windowStart + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), currentRecordTimestamp);
+            putAndForward(window, valueAndTime, key, value, closeTime, currentRecordTimestamp);
+        }
+
+        private void createCurrentRecordLeftWindow(final Long previousRecordTimestamp,
+                                               final long timestamp,
+                                               final ValueAndTimestamp<Agg> leftWinAgg,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final ValueAndTimestamp<Agg> valueAndTime;
+            // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+            if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
+                valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
+            } else {
+                valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+            }
+            final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
+            putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
         }
 
-        private boolean isLeftWindow(final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> window) {
-            return window.key.window().end() == window.value.timestamp();
+        private boolean leftWindowNotEmpty(final long previousTimestamp, final long currentTimestamp) {
+            return currentTimestamp - windows.timeDifferenceMs() <= previousTimestamp;
+        }
+
+        // previous record's right window does not already exist and current record falls within previous record's right window
+        private boolean rightWindowNecessaryAndPossible(final Set<Long> windowStartTimes,

Review comment:
       Sorry but this method name continues to throw me off...the comment does a good job of reminding what the check actually does/means, but ideally the method name alone would do a reasonable job of that. What about `previousRecordRightWindowMustBeCreated` or `previousRecordRightWindowDoesNotExistAndIsNotEmpty` ? I know those are both super long, especially the 2nd option, but I personally think the 2nd option does the best job of providing the link between what the check actually does, and why we do it. It's better to be clear than concise (when you can't be both)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;

Review comment:
       I think it's also easiest to follow if we keep the conditions in the order that we will actually see them. So this case would be the first one (everything else is in order I think)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }

Review comment:
       Since this is the current record's left window, either this condition or true or we already had a record with the same timestamp as the current record. Just throwing out a suggestion, maybe we could keep a boolean that tracks whether we already have a record at the current timestamp and if so we can actaully skip everything after the loop

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), timestamp));
+            windowStore.put(
+                key,
+                valueAndTime,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());        }
+
+        private void createPreviousRightWindow(final long windowStart,

Review comment:
       nit: name it `createPreviousRecordRightWindow` for consistency/clarity

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +120,56 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding window
-            if (timestamp < windows.timeDifferenceMs()) {
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+
+            if (timestamp + 1 + windows.timeDifferenceMs() <= closeTime) {
                 log.warn(
-                    "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}] " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    timestamp - windows.timeDifferenceMs(), timestamp,
+                    closeTime,
+                    observedStreamTime
                 );
-                droppedRecordsSensor.record();
+                lateRecordDropSensor.record();
                 return;
             }
-            processInOrder(key, value, timestamp);
-        }
 
-        public void processInOrder(final K key, final V value, final long timestamp) {
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;
+            }
 
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;

Review comment:
       We should log a debug message indicating which we decide to use

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), timestamp));
+            windowStore.put(
+                key,
+                valueAndTime,
+                window.start());
+            tupleForwarder.maybeForward(
+                new Windowed<>(key, window),
+                rightWinAgg.value(),
+                null,
+                rightWinAgg.timestamp());        }
+
+        private void createPreviousRightWindow(final long windowStart,
+                                               final long currentRecordTimestamp,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final TimeWindow window = new TimeWindow(windowStart, windowStart + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), currentRecordTimestamp);
+            putAndForward(window, valueAndTime, key, value, closeTime, currentRecordTimestamp);
+        }
+
+        private void createCurrentRecordLeftWindow(final Long previousRecordTimestamp,
+                                               final long timestamp,
+                                               final ValueAndTimestamp<Agg> leftWinAgg,
+                                               final K key,
+                                               final V value,
+                                               final long closeTime) {
+            final ValueAndTimestamp<Agg> valueAndTime;
+            // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
+            if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {

Review comment:
       We might as well move the `previousRecordTimestamp` null check into `leftWindowNotEmpty`. Also you can probably remove the comment then since it's saying basically the same thing as `if leftWindowNotEmpty`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) {
             }
 
             //create right window for previous record
-            if (latestLeftTypeWindow != null) {
-                final long rightWinStart = latestLeftTypeWindow.end() + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
-                    putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
                 }
             }
 
             //create left window for new record
             if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            // create right window for new record, if necessary
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        public void processReverse(final K key, final V value, final long timestamp, final long closeTime) {
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                //if we've already seen the window with the closest start time to the record
+                boolean foundRightWinAgg = false;
+
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (endTime > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (endTime == timestamp) {
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                    } else if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        previousRecordTimestamp = windowMaxRecordTimestamp;
+                        break;
+                    } else {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            //create right window for previous record
+            if (previousRecordTimestamp != null) {
+                final long previousRightWinStart = previousRecordTimestamp + 1;
+                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) {
+                    createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime);
+                }
+            }
+
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come in later
+         */
+        private void processEarly(final K key, final V value, final long timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long windowMaxRecordTimestamp = next.value.timestamp();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                        if (windowMaxRecordTimestamp < timestamp) {
+                            // If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the
+                            // previous record's right window would have been created already by other records. This
+                            // will always be true for early records, as they all fall within [0, timeDifferenceMs].
+                            previousRecordTimestamp = windowMaxRecordTimestamp;
+                        }
+
+                    } else if (startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
                 }
-                final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp);
-                putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
             }
-            //create right window for new record
+
+            // if there wasn't a right window agg found and we need a right window for our new record,
+            // the current aggregate in the combined window will go in the new record's right window
+            if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            //create right window for new record if needed
             if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) {
-                final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
-                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp));
+                createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+            }
+
+            //create the right window for the previous record if the previous record exists and the window hasn't already been created
+            if (previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)) {
+                createPreviousRightWindow(previousRecordTimestamp + 1, timestamp, key, value, closeTime);
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, timestamp);
+
+            } else {
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp);
             }
+
         }
 
-        private boolean rightWindowIsNotEmpty(final ValueAndTimestamp<Agg> rightWinAgg, final long timestamp) {
-            return rightWinAgg != null && rightWinAgg.timestamp() > timestamp;
+        private void createCurrentRecordRightWindow(final long timestamp,
+                                                    final ValueAndTimestamp<Agg> rightWinAgg,
+                                                    final K key) {
+            final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs());
+            final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(rightWinAgg.value(), Math.max(rightWinAgg.timestamp(), timestamp));

Review comment:
       `Math.max(rightWinAgg.timestamp(), timestamp)` doesn't make sense for this case, since we're not actually putting the current record in the window (although technically it will still choose the correct timestamp, for that reason).
   But I think this might just be out of date and need to be rebased after the changes in the early records PR?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +120,56 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding window
-            if (timestamp < windows.timeDifferenceMs()) {
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+
+            if (timestamp + 1 + windows.timeDifferenceMs() <= closeTime) {
                 log.warn(
-                    "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}] " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    timestamp - windows.timeDifferenceMs(), timestamp,
+                    closeTime,
+                    observedStreamTime
                 );
-                droppedRecordsSensor.record();
+                lateRecordDropSensor.record();
                 return;
             }
-            processInOrder(key, value, timestamp);
-        }
 
-        public void processInOrder(final K key, final V value, final long timestamp) {
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;
+            }
 
-            observedStreamTime = Math.max(observedStreamTime, timestamp);
-            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            if (reverseIteratorPossible == null) {
+                try {
+                    windowStore.backwardFetch(key, 0L, 0L);
+                    reverseIteratorPossible = true;
+                } catch (final UnsupportedOperationException e)  {
+                    reverseIteratorPossible = false;

Review comment:
       Also we can then use the log message to verify that the correct `process` method gets chosen




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487171555



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
         assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
         assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
     }
-}
\ No newline at end of file
+
+    private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
       Beautiful. Thanks!

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Ah, I belatedly realized what I think was @ableegoldman's concern. Suppress cares about the timestamps of the records, not the window start times. Since the timestamp of the windowed aggregation results are determined by the input record, not the window start times, all window agg updates that get forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       FWIW, I agree. The result of this aggregation is a KTable, i.e., a changelog stream. The semantics of the changelog stream only state that each record represents the latest state for that record's key. Just like the caches are free to drop arbitrary intermediate updates and KIP-557 is free to drop arbitrary idempotent updates, a processor that forwards updates for multiple different keys should be free to do it in any order.
   
   In fact, I might go so far as to suggest that a proper behavioral test would load all the results into a map so that the test isn't sensitive to meaningless changes like this. But I won't go quite that far because it seems good to to have the opportunity to ask questions like @ableegoldman's. Just to be sure nothing unexpected is happening when we change things later.
   
   I also think it's better for performance not to try and buffer the window results and emit them in "forward" order, since it might be an arbitrarily large number of updates for us to keep in memory.

##########
File path: checkstyle/suppressions.xml
##########
@@ -203,6 +203,9 @@
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
+    <suppress checks="MethodLength"
+              files="KStreamSlidingWindowAggregateTest.java"/>

Review comment:
       It seems like resolving @ableegoldman 's comment ( https://github.com/apache/kafka/pull/9239/files#r486627464  ) would make this unnecessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       It might matter for emit-on-full suppress buffers (if they actually get full), but even then, I think it's equally correct either way, so I don't think we need to be concerned.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder

Review comment:
       Yes, this would be better. Not sure if it helps, but for reference, this is what we did in `org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinMaterializationIntegrationTest#getTopology`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass

Review comment:
       Yeah, it looks pretty straightforward without the comment. If anything would deserve clarification in a comment, it would be a reminder that we do _not_ add the current record to the right-hand window. Then again, it's pretty fundamental to the algorithm.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       The Materialized builder is notoriously vulnerable to "weird type nonsense" because it falls into a gap in Java's type inference system when you use chained methods. Let's see what happens when you implement @ableegoldman 's earlier suggestion.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -68,7 +68,7 @@
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
+    public InMemoryWindowStore(final String name,

Review comment:
       Thanks for the explanation. This sounds fine to me.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), Duration.ofMillis(10), false);

Review comment:
       These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org