You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/04/27 16:11:23 UTC
[kafka] branch 3.0 updated: fix: make sliding window works without grace period (#kafka-13739) (#11980)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 06b6212993 fix: make sliding window works without grace period (#kafka-13739) (#11980)
06b6212993 is described below
commit 06b621299310b944b385dd087b1dfb694eb75798
Author: Bounkong Khamphousone <bo...@gmail.com>
AuthorDate: Wed Apr 6 15:43:44 2022 +0200
fix: make sliding window works without grace period (#kafka-13739) (#11980)
backport of kafka-13739
---
.../internals/KStreamSlidingWindowAggregate.java | 2 +-
.../KStreamSlidingWindowAggregateTest.java | 304 +++++++++++++++++++++
2 files changed, 305 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index db91bb3a7b..84733b3bab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -448,7 +448,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
final long inputRecordTimestamp) {
final long windowStart = window.start();
final long windowEnd = window.end();
- if (windowEnd > closeTime) {
+ if (windowEnd >= closeTime) {
//get aggregate from existing window
final Agg oldAgg = getValueOrNull(valueAndTime);
final Agg newAgg = aggregator.apply(key, value, oldAgg);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 798159d353..45897b0ccd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -703,6 +703,310 @@ public class KStreamSlidingWindowAggregateTest {
);
}
+ @Test
+ public void testEarlyNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ // all events are considered as early events since record timestamp is less than time difference of the window
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 0L);
+ inputTopic.pipeInput("A", "2", 5L);
+ inputTopic.pipeInput("A", "3", 6L);
+ inputTopic.pipeInput("A", "4", 3L);
+ inputTopic.pipeInput("A", "5", 13L);
+ inputTopic.pipeInput("A", "6", 10L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+ expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+ expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+ expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+ expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+ expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 100L);
+ inputTopic.pipeInput("A", "2", 105L);
+ inputTopic.pipeInput("A", "3", 106L);
+ inputTopic.pipeInput("A", "4", 103L);
+ inputTopic.pipeInput("A", "5", 113L);
+ inputTopic.pipeInput("A", "6", 110L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+ expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+ expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+ expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+ expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+ expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+ expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+ expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+ expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testEarlyNoGracePeriodLargeInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+ final WindowBytesStoreSupplier storeSupplier =
+ inOrderIterator
+ ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false)
+ : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(storeSupplier)
+ );
+
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+ inputTopic1.pipeInput("E", "1", 0L);
+ inputTopic1.pipeInput("E", "3", 5L);
+ inputTopic1.pipeInput("E", "4", 6L);
+ inputTopic1.pipeInput("E", "2", 3L);
+ inputTopic1.pipeInput("E", "6", 13L);
+ inputTopic1.pipeInput("E", "5", 10L);
+ inputTopic1.pipeInput("E", "7", 4L);
+ inputTopic1.pipeInput("E", "8", 2L);
+ inputTopic1.pipeInput("E", "9", 15L);
+ }
+ final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+ .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+ final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+ actual.sort(comparator);
+ assertEquals(
+ asList(
+ // E@0
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)),
+ actual
+ );
+ }
+
+ @Test
+ public void testNoGracePeriodLargeInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+ final WindowBytesStoreSupplier storeSupplier =
+ inOrderIterator
+ ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false)
+ : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(storeSupplier)
+ );
+
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic1 =
+ driver.createInputTopic(topic, new StringSerializer(), new StringSerializer());
+
+ inputTopic1.pipeInput("E", "1", 100L);
+ inputTopic1.pipeInput("E", "3", 105L);
+ inputTopic1.pipeInput("E", "4", 106L);
+ inputTopic1.pipeInput("E", "2", 103L);
+ inputTopic1.pipeInput("E", "6", 113L);
+ inputTopic1.pipeInput("E", "5", 110L);
+ inputTopic1.pipeInput("E", "7", 104L);
+ inputTopic1.pipeInput("E", "8", 102L);
+ inputTopic1.pipeInput("E", "9", 115L);
+ }
+ final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+ .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start());
+
+ final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed();
+ actual.sort(comparator);
+ assertEquals(
+ asList(
+ // E@0
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 100)), "0+1", 100),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 105)), "0+1+3", 105),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4", 106),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4+2", 106),
+ // E@5
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3", 105),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4", 106),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4+2", 106),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+ // E@3
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4", 106),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5", 113),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+ //E@4
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+ // E@6
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4", 106),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5+9", 115),
+ //E@13
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6", 113),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5+9", 115),
+ //E@10
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6", 113),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6+9", 115),
+ //E@15
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(114, 124)), "0+9", 115)),
+ actual
+ );
+ }
+
@Test
public void shouldLogAndMeterWhenSkippingNullKey() {
final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;