You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/04/27 16:11:23 UTC

[kafka] branch 3.0 updated: fix: make sliding window works without grace period (#kafka-13739) (#11980)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 06b6212993 fix: make sliding window works without grace period (#kafka-13739) (#11980)
06b6212993 is described below

commit 06b621299310b944b385dd087b1dfb694eb75798
Author: Bounkong Khamphousone <bo...@gmail.com>
AuthorDate: Wed Apr 6 15:43:44 2022 +0200

    fix: make sliding window works without grace period (#kafka-13739) (#11980)
    
    backport of kafka-13739
---
 .../internals/KStreamSlidingWindowAggregate.java   |   2 +-
 .../KStreamSlidingWindowAggregateTest.java         | 304 +++++++++++++++++++++
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index db91bb3a7b..84733b3bab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -448,7 +448,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
                                             final long inputRecordTimestamp) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
-            if (windowEnd > closeTime) {
+            if (windowEnd >= closeTime) {
                 //get aggregate from existing window
                 final Agg oldAgg = getValueOrNull(valueAndTime);
                 final Agg newAgg = aggregator.apply(key, value, oldAgg);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 798159d353..45897b0ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -703,6 +703,310 @@ public class KStreamSlidingWindowAggregateTest {
         );
     }
 
+    @Test
+    public void testEarlyNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        // all events are considered as early events since record timestamp is less than time difference of the window
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 0L);
+            inputTopic.pipeInput("A", "2", 5L);
+            inputTopic.pipeInput("A", "3", 6L);
+            inputTopic.pipeInput("A", "4", 3L);
+            inputTopic.pipeInput("A", "5", 13L);
+            inputTopic.pipeInput("A", "6", 10L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+        expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+        expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+        expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+        expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+        expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 100L);
+            inputTopic.pipeInput("A", "2", 105L);
+            inputTopic.pipeInput("A", "3", 106L);
+            inputTopic.pipeInput("A", "4", 103L);
+            inputTopic.pipeInput("A", "5", 113L);
+            inputTopic.pipeInput("A", "6", 110L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+        expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+        expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+        expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+        expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+        expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+        expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+        expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+        expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testEarlyNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 0L);
+            inputTopic1.pipeInput("E", "3", 5L);
+            inputTopic1.pipeInput("E", "4", 6L);
+            inputTopic1.pipeInput("E", "2", 3L);
+            inputTopic1.pipeInput("E", "6", 13L);
+            inputTopic1.pipeInput("E", "5", 10L);
+            inputTopic1.pipeInput("E", "7", 4L);
+            inputTopic1.pipeInput("E", "8", 2L);
+            inputTopic1.pipeInput("E", "9", 15L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)),
+            actual
+        );
+    }
+
+    @Test
+    public void testNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 100L);
+            inputTopic1.pipeInput("E", "3", 105L);
+            inputTopic1.pipeInput("E", "4", 106L);
+            inputTopic1.pipeInput("E", "2", 103L);
+            inputTopic1.pipeInput("E", "6", 113L);
+            inputTopic1.pipeInput("E", "5", 110L);
+            inputTopic1.pipeInput("E", "7", 104L);
+            inputTopic1.pipeInput("E", "8", 102L);
+            inputTopic1.pipeInput("E", "9", 115L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 100)), "0+1", 100),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 105)), "0+1+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4+2", 106),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4+2", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5+9", 115),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5+9", 115),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6+9", 115),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(114, 124)), "0+9", 115)),
+            actual
+        );
+    }
+
     @Test
     public void shouldLogAndMeterWhenSkippingNullKey() {
         final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;