You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/06/28 22:40:56 UTC

[kafka] branch trunk updated: KAFKA-12718: SessionWindows are closed too early (#10824)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfcabc3  KAFKA-12718: SessionWindows are closed too early (#10824)
cfcabc3 is described below

commit cfcabc368c3f0a123c4561059b043648d79b1df8
Author: Juan Gonzalez-Zurita <55...@users.noreply.github.com>
AuthorDate: Mon Jun 28 18:39:49 2021 -0400

    KAFKA-12718: SessionWindows are closed too early (#10824)
    
    Session windows should not be close directly when "window end" time is reached, but "window close" time should be "window-end + gap + grace-period".
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>
---
 .../kstream/internals/KStreamSessionWindowAggregate.java |  2 +-
 .../KStreamSessionWindowAggregateProcessorTest.java      | 16 ++++++++--------
 .../streams/kstream/internals/SuppressScenarioTest.java  |  6 +++---
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 5c84369..f15997f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -106,7 +106,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
 
             final long timestamp = context().timestamp();
             observedStreamTime = Math.max(observedStreamTime, timestamp);
-            final long closeTime = observedStreamTime - windows.gracePeriodMs();
+            final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
 
             final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 4ab8a45..d33f747 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -421,8 +421,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
         processor.process("OnTime1", "1");
 
-        // dummy record to advance stream time = 1
-        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
+        // dummy record to advance stream time = 11, 10 for gap time plus 1 to place outside window
+        context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
         processor.process("dummy", "dummy");
 
         try (final LogCaptureAppender appender =
@@ -435,7 +435,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             assertThat(
                 appender.getMessages(),
                 hasItem("Skipping record for expired window." +
-                    " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]")
+                    " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]")
             );
         }
 
@@ -489,16 +489,16 @@ public class KStreamSessionWindowAggregateProcessorTest {
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime1", "1");
 
-            // dummy record to advance stream time = 1
-            context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
+            // dummy record to advance stream time = 11, 10 for gap time plus 1 to place at edge of window
+            context.setRecordContext(new ProcessorRecordContext(11, -2, -3, "topic", new RecordHeaders()));
             processor.process("dummy", "dummy");
 
             // delayed record arrives on time, should not be skipped
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime2", "1");
 
-            // dummy record to advance stream time = 2
-            context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", new RecordHeaders()));
+            // dummy record to advance stream time = 12, 10 for gap time plus 2 to place outside window
+            context.setRecordContext(new ProcessorRecordContext(12, -2, -3, "topic", new RecordHeaders()));
             processor.process("dummy", "dummy");
 
             // delayed record arrives late
@@ -508,7 +508,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             assertThat(
                 appender.getMessages(),
                 hasItem("Skipping record for expired window." +
-                    " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]")
+                    " key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]")
             );
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 46a8ab8..e0b7957 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -581,7 +581,7 @@ public class SuppressScenarioTest {
             // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
             inputTopic.pipeInput("k1", "v1", 1L);
             // any record in the same partition advances stream time (note the key is different)
-            inputTopic.pipeInput("k2", "v1", 6L);
+            inputTopic.pipeInput("k2", "v1", 11L);
             // late event for first window - this should get dropped from all streams, since the first window is now closed.
             inputTopic.pipeInput("k1", "v1", 5L);
             // just pushing stream time forward to flush the other events through.
@@ -594,7 +594,7 @@ public class SuppressScenarioTest {
                     new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
                     new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
                     new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
-                    new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
+                    new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L),
                     new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
                 )
             );
@@ -602,7 +602,7 @@ public class SuppressScenarioTest {
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
-                    new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
+                    new KeyValueTimestamp<>("[k2@11/11]", 1L, 11L)
                 )
             );
         }