You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/11 02:55:35 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
         assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
         assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
     }
-}
\ No newline at end of file
+
+    private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
       Beautiful. Thanks!

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Ah, I belatedly realized what I think was @ableegoldman's concern. Suppress cares about the timestamps of the records, not the window start times. Since the timestamp of the windowed aggregation results are determined by the input record, not the window start times, all window agg updates that get forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator =

Review comment:
       FWIW, I agree. The result of this aggregation is a KTable, i.e., a changelog stream. The semantics of the changelog stream only state that each record represents the latest state for that record's key. Just like the caches are free to drop arbitrary intermediate updates and KIP-557 is free to drop arbitrary idempotent updates, a processor that forwards updates for multiple different keys should be free to do it in any order.
   
   In fact, I might go so far as to suggest that a proper behavioral test would load all the results into a map so that the test isn't sensitive to meaningless changes like this. But I won't go quite that far because it seems good to to have the opportunity to ask questions like @ableegoldman's. Just to be sure nothing unexpected is happening when we change things later.
   
   I also think it's better for performance not to try and buffer the window results and emit them in "forward" order, since it might be an arbitrarily large number of updates for us to keep in memory.

##########
File path: checkstyle/suppressions.xml
##########
@@ -203,6 +203,9 @@
     <suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
+    <suppress checks="MethodLength"
+              files="KStreamSlidingWindowAggregateTest.java"/>

Review comment:
       It seems like resolving @ableegoldman 's comment ( https://github.com/apache/kafka/pull/9239/files#r486627464  ) would make this unnecessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       It might matter for emit-on-full suppress buffers (if they actually get full), but even then, I think it's equally correct either way, so I don't think we need to be concerned.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder

Review comment:
       Yes, this would be better. Not sure if it helps, but for reference, this is what we did in `org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinMaterializationIntegrationTest#getTopology`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> windowBeingProcessed = iterator.next();
+                    final long startTime = windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, will only be true at most once, on the first pass

Review comment:
       Yeah, it looks pretty straightforward without the comment. If anything would deserve clarification in a comment, it would be a reminder that we do _not_ add the current record to the right-hand window. Then again, it's pretty fundamental to the algorithm.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       The Materialized builder is notoriously vulnerable to "weird type nonsense" because it falls into a gap in Java's type inference system when you use chained methods. Let's see what happens when you implement @ableegoldman 's earlier suggestion.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -68,7 +68,7 @@
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
+    public InMemoryWindowStore(final String name,

Review comment:
       Thanks for the explanation. This sounds fine to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org