You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2022/03/31 14:07:43 UTC

[kafka] branch trunk updated: fix: make sliding window works without grace period (#kafka-13739) (#11928)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c279b6  fix: make sliding window works without grace period (#kafka-13739) (#11928)
3c279b6 is described below

commit 3c279b63fa862671330b568f5f58df69f9352c35
Author: Bounkong Khamphousone <bo...@gmail.com>
AuthorDate: Thu Mar 31 16:05:53 2022 +0200

    fix: make sliding window works without grace period (#kafka-13739) (#11928)
    
    Fix upperbound for sliding window, making it compatible with no grace period (kafka-13739)
    
    Added unit test for early sliding window and "normal" sliding window for both events within one time difference (small input) and above window time difference (large input).
    
    Fixing this window interval may slightly change stream behavior but probability to happen is extremely slow and may not have a huge impact on the result given.
    
    Reviewers Leah Thomas <lt...@confluent.io>, Bill Bejeck <bb...@apache.org>
---
 .../internals/KStreamSlidingWindowAggregate.java   |   2 +-
 .../KStreamSlidingWindowAggregateTest.java         | 304 +++++++++++++++++++++
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index fd0198b..73c901c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -473,7 +473,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
                                             final long closeTime) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
-            if (windowEnd > closeTime) {
+            if (windowEnd >= closeTime) {
                 //get aggregate from existing window
                 final VAgg oldAgg = getValueOrNull(valueAndTime);
                 final VAgg newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 8e8115f..b227c71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -699,6 +699,310 @@ public class KStreamSlidingWindowAggregateTest {
     }
 
     @Test
+    public void testEarlyNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        // all events are considered as early events since record timestamp is less than time difference of the window
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 0L);
+            inputTopic.pipeInput("A", "2", 5L);
+            inputTopic.pipeInput("A", "3", 6L);
+            inputTopic.pipeInput("A", "4", 3L);
+            inputTopic.pipeInput("A", "5", 13L);
+            inputTopic.pipeInput("A", "6", 10L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+        expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+        expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+        expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+        expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+        expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 100L);
+            inputTopic.pipeInput("A", "2", 105L);
+            inputTopic.pipeInput("A", "3", 106L);
+            inputTopic.pipeInput("A", "4", 103L);
+            inputTopic.pipeInput("A", "5", 113L);
+            inputTopic.pipeInput("A", "6", 110L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+        expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+        expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+        expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+        expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+        expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+        expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+        expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+        expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testEarlyNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 0L);
+            inputTopic1.pipeInput("E", "3", 5L);
+            inputTopic1.pipeInput("E", "4", 6L);
+            inputTopic1.pipeInput("E", "2", 3L);
+            inputTopic1.pipeInput("E", "6", 13L);
+            inputTopic1.pipeInput("E", "5", 10L);
+            inputTopic1.pipeInput("E", "7", 4L);
+            inputTopic1.pipeInput("E", "8", 2L);
+            inputTopic1.pipeInput("E", "9", 15L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)),
+            actual
+        );
+    }
+
+    @Test
+    public void testNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 100L);
+            inputTopic1.pipeInput("E", "3", 105L);
+            inputTopic1.pipeInput("E", "4", 106L);
+            inputTopic1.pipeInput("E", "2", 103L);
+            inputTopic1.pipeInput("E", "6", 113L);
+            inputTopic1.pipeInput("E", "5", 110L);
+            inputTopic1.pipeInput("E", "7", 104L);
+            inputTopic1.pipeInput("E", "8", 102L);
+            inputTopic1.pipeInput("E", "9", 115L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 100)), "0+1", 100),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 105)), "0+1+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4+2", 106),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4+2", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5+9", 115),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5+9", 115),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6+9", 115),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(114, 124)), "0+9", 115)),
+            actual
+        );
+    }
+
+    @Test
     public void shouldLogAndMeterWhenSkippingNullKey() {
         final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;
         final StreamsBuilder builder = new StreamsBuilder();