You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:28:06 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10824: KAFKA-12718 [WIP]: SessionWindows are closed too early

mjsax commented on a change in pull request #10824:
URL: https://github.com/apache/kafka/pull/10824#discussion_r654149249



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -421,8 +421,8 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
         processor.process("OnTime1", "1");
 
-        // dummy record to advance stream time = 1
-        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
+        // dummy record to advance stream time = 11
+        context.setRecordContext(new ProcessorRecordContext(10 + 1, -2, -3, "topic", new RecordHeaders()));

Review comment:
       Why `10 + 1`? Just make it `11`. Use the comment to explain the why

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -421,8 +421,8 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));

Review comment:
       Wondering if we should bump this timestamp to 10?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -421,8 +421,8 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));

Review comment:
       Wondering if we should bump this timestamp to 10? (Or course, we would need to bump 11 below to 21 for this case)

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -489,16 +489,16 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime1", "1");
 
-            // dummy record to advance stream time = 1
-            context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
+            // dummy record to advance stream time = 11
+            context.setRecordContext(new ProcessorRecordContext(10 + 1, -2, -3, "topic", new RecordHeaders()));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -489,16 +489,16 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime1", "1");
 
-            // dummy record to advance stream time = 1
-            context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
+            // dummy record to advance stream time = 11
+            context.setRecordContext(new ProcessorRecordContext(10 + 1, -2, -3, "topic", new RecordHeaders()));
             processor.process("dummy", "dummy");
 
             // delayed record arrives on time, should not be skipped
             context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime2", "1");
 
-            // dummy record to advance stream time = 2
-            context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", new RecordHeaders()));
+            // dummy record to advance stream time = 12
+            context.setRecordContext(new ProcessorRecordContext(10 + 2, -2, -3, "topic", new RecordHeaders()));

Review comment:
       as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org