You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/01 07:21:11 UTC

[flink] 01/02: [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7e15262cdf33d9587b2cb898ca7384fa5903824
Author: liliwei <hi...@gmail.com>
AuthorDate: Thu Sep 2 03:16:54 2021 +0800

    [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window
---
 .../triggers/ContinuousProcessingTimeTrigger.java  | 30 ++++----
 .../ContinuousProcessingTimeTriggerTest.java       | 82 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index c437ba7..e3b1325 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -50,18 +50,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
     @Override
     public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
             throws Exception {
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
 
         timestamp = ctx.getCurrentProcessingTime();
 
-        if (fireTimestamp.get() == null) {
-            long start = timestamp - (timestamp % interval);
-            long nextFireTimestamp = start + interval;
-
-            ctx.registerProcessingTimeTimer(nextFireTimestamp);
-
-            fireTimestamp.add(nextFireTimestamp);
-            return TriggerResult.CONTINUE;
+        if (fireTimestampState.get() == null) {
+            registerNextFireTimestamp(
+                    timestamp - (timestamp % interval), window, ctx, fireTimestampState);
         }
         return TriggerResult.CONTINUE;
     }
@@ -74,12 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
     @Override
     public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
             throws Exception {
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
 
-        if (fireTimestamp.get().equals(time)) {
-            fireTimestamp.clear();
-            fireTimestamp.add(time + interval);
-            ctx.registerProcessingTimeTimer(time + interval);
+        if (fireTimestampState.get().equals(time)) {
+            fireTimestampState.clear();
+            registerNextFireTimestamp(time, window, ctx, fireTimestampState);
             return TriggerResult.FIRE;
         }
         return TriggerResult.CONTINUE;
@@ -141,4 +135,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
             return Math.min(value1, value2);
         }
     }
+
+    private void registerNextFireTimestamp(
+            long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+            throws Exception {
+        long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+        fireTimestampState.add(nextFireTimestamp);
+        ctx.registerProcessingTimeTimer(nextFireTimestamp);
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
index a50ee92..e0b58ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -93,6 +94,84 @@ public class ContinuousProcessingTimeTriggerTest {
         }
     }
 
+    /** Verify ContinuousProcessingTimeTrigger fire. */
+    @Test
+    public void testWindowFiring() throws Exception {
+        ContinuousProcessingTimeTrigger<TimeWindow> trigger =
+                ContinuousProcessingTimeTrigger.of(Time.milliseconds(5));
+
+        assertTrue(trigger.canMerge());
+
+        ListStateDescriptor<Integer> stateDesc =
+                new ListStateDescriptor<>(
+                        "window-contents",
+                        BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+        WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, TimeWindow> operator =
+                new WindowOperator<>(
+                        TumblingProcessingTimeWindows.of(Time.milliseconds(10)),
+                        new TimeWindow.Serializer(),
+                        new NullByteKeySelector<>(),
+                        BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                        stateDesc,
+                        new InternalIterableWindowFunction<>(new IntegerSumWindowFunction()),
+                        trigger,
+                        0,
+                        null);
+
+        KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, operator.getKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO);
+
+        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+        testHarness.open();
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(0);
+        testHarness.processElement(1, NO_TIMESTAMP);
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(2);
+        testHarness.processElement(2, NO_TIMESTAMP);
+
+        // Fire window [0, 10), value is 1+2=3.
+        testHarness.getProcessingTimeService().setCurrentTime(5);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 3), 9));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(7);
+        testHarness.processElement(3, NO_TIMESTAMP);
+
+        // Fire window [0, 10), value is 3+3=6.
+        testHarness.getProcessingTimeService().setCurrentTime(9);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 6), 9));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // window [10, 20)
+        testHarness.getProcessingTimeService().setCurrentTime(10);
+        testHarness.processElement(3, NO_TIMESTAMP);
+
+        // Fire window [10, 20), value is 3.
+        testHarness.getProcessingTimeService().setCurrentTime(15);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 3), 19));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // window [10, 20)
+        testHarness.getProcessingTimeService().setCurrentTime(18);
+        testHarness.processElement(3, NO_TIMESTAMP);
+
+        // Fire window [10, 20), value is 3+3=6.
+        testHarness.getProcessingTimeService().setCurrentTime(20);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 6), 19));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+    }
+
     @Test
     public void testMergingWindows() throws Exception {
         ContinuousProcessingTimeTrigger<TimeWindow> trigger =
@@ -149,8 +228,9 @@ public class ContinuousProcessingTimeTriggerTest {
         TestHarnessUtil.assertOutputEquals(
                 "Output mismatch", expectedOutput, testHarness.getOutput());
 
-        // There is no on time firing for now.
+        // Firing on time.
         testHarness.getProcessingTimeService().setCurrentTime(15);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11));
         TestHarnessUtil.assertOutputEquals(
                 "Output mismatch", expectedOutput, testHarness.getOutput());