You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/01 07:21:10 UTC

[flink] branch master updated (34d581b -> 3b163c7)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 34d581b  [FLINK-20895][table-planner] Support local aggregate push down in table planner
     new a7e1526  [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window
     new 3b163c7  [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger  optimization

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../triggers/ContinuousEventTimeTrigger.java       | 21 +++---
 .../triggers/ContinuousProcessingTimeTrigger.java  | 30 ++++----
 .../ContinuousProcessingTimeTriggerTest.java       | 82 +++++++++++++++++++++-
 3 files changed, 110 insertions(+), 23 deletions(-)

[flink] 02/02: [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger optimization

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b163c715ebda99d58c9161023c260f5ba0ea88a
Author: liliwei <hi...@gmail.com>
AuthorDate: Sat Oct 23 17:17:07 2021 +0800

    [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger  optimization
---
 .../triggers/ContinuousEventTimeTrigger.java        | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4ff2d93..8d0dcb8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -59,12 +59,10 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
             ctx.registerEventTimeTimer(window.maxTimestamp());
         }
 
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-        if (fireTimestamp.get() == null) {
-            long start = timestamp - (timestamp % interval);
-            long nextFireTimestamp = start + interval;
-            ctx.registerEventTimeTimer(nextFireTimestamp);
-            fireTimestamp.add(nextFireTimestamp);
+        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
+        if (fireTimestampState.get() == null) {
+            registerNextFireTimestamp(
+                    timestamp - (timestamp % interval), window, ctx, fireTimestampState);
         }
 
         return TriggerResult.CONTINUE;
@@ -83,8 +81,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 
         if (fireTimestamp != null && fireTimestamp == time) {
             fireTimestampState.clear();
-            fireTimestampState.add(time + interval);
-            ctx.registerEventTimeTimer(time + interval);
+            registerNextFireTimestamp(time, window, ctx, fireTimestampState);
             return TriggerResult.FIRE;
         }
 
@@ -149,4 +146,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
             return Math.min(value1, value2);
         }
     }
+
+    private void registerNextFireTimestamp(
+            long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+            throws Exception {
+        long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+        fireTimestampState.add(nextFireTimestamp);
+        ctx.registerEventTimeTimer(nextFireTimestamp);
+    }
 }

[flink] 01/02: [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7e15262cdf33d9587b2cb898ca7384fa5903824
Author: liliwei <hi...@gmail.com>
AuthorDate: Thu Sep 2 03:16:54 2021 +0800

    [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window
---
 .../triggers/ContinuousProcessingTimeTrigger.java  | 30 ++++----
 .../ContinuousProcessingTimeTriggerTest.java       | 82 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index c437ba7..e3b1325 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -50,18 +50,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
     @Override
     public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
             throws Exception {
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
 
         timestamp = ctx.getCurrentProcessingTime();
 
-        if (fireTimestamp.get() == null) {
-            long start = timestamp - (timestamp % interval);
-            long nextFireTimestamp = start + interval;
-
-            ctx.registerProcessingTimeTimer(nextFireTimestamp);
-
-            fireTimestamp.add(nextFireTimestamp);
-            return TriggerResult.CONTINUE;
+        if (fireTimestampState.get() == null) {
+            registerNextFireTimestamp(
+                    timestamp - (timestamp % interval), window, ctx, fireTimestampState);
         }
         return TriggerResult.CONTINUE;
     }
@@ -74,12 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
     @Override
     public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
             throws Exception {
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
 
-        if (fireTimestamp.get().equals(time)) {
-            fireTimestamp.clear();
-            fireTimestamp.add(time + interval);
-            ctx.registerProcessingTimeTimer(time + interval);
+        if (fireTimestampState.get().equals(time)) {
+            fireTimestampState.clear();
+            registerNextFireTimestamp(time, window, ctx, fireTimestampState);
             return TriggerResult.FIRE;
         }
         return TriggerResult.CONTINUE;
@@ -141,4 +135,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
             return Math.min(value1, value2);
         }
     }
+
+    private void registerNextFireTimestamp(
+            long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+            throws Exception {
+        long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+        fireTimestampState.add(nextFireTimestamp);
+        ctx.registerProcessingTimeTimer(nextFireTimestamp);
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
index a50ee92..e0b58ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -93,6 +94,84 @@ public class ContinuousProcessingTimeTriggerTest {
         }
     }
 
+    /** Verify ContinuousProcessingTimeTrigger fire. */
+    @Test
+    public void testWindowFiring() throws Exception {
+        ContinuousProcessingTimeTrigger<TimeWindow> trigger =
+                ContinuousProcessingTimeTrigger.of(Time.milliseconds(5));
+
+        assertTrue(trigger.canMerge());
+
+        ListStateDescriptor<Integer> stateDesc =
+                new ListStateDescriptor<>(
+                        "window-contents",
+                        BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+        WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, TimeWindow> operator =
+                new WindowOperator<>(
+                        TumblingProcessingTimeWindows.of(Time.milliseconds(10)),
+                        new TimeWindow.Serializer(),
+                        new NullByteKeySelector<>(),
+                        BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()),
+                        stateDesc,
+                        new InternalIterableWindowFunction<>(new IntegerSumWindowFunction()),
+                        trigger,
+                        0,
+                        null);
+
+        KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, operator.getKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO);
+
+        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+        testHarness.open();
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(0);
+        testHarness.processElement(1, NO_TIMESTAMP);
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(2);
+        testHarness.processElement(2, NO_TIMESTAMP);
+
+        // Fire window [0, 10), value is 1+2=3.
+        testHarness.getProcessingTimeService().setCurrentTime(5);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 3), 9));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(7);
+        testHarness.processElement(3, NO_TIMESTAMP);
+
+        // Fire window [0, 10), value is 3+3=6.
+        testHarness.getProcessingTimeService().setCurrentTime(9);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 6), 9));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // window [10, 20)
+        testHarness.getProcessingTimeService().setCurrentTime(10);
+        testHarness.processElement(3, NO_TIMESTAMP);
+
+        // Fire window [10, 20), value is 3.
+        testHarness.getProcessingTimeService().setCurrentTime(15);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 3), 19));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // window [10, 20)
+        testHarness.getProcessingTimeService().setCurrentTime(18);
+        testHarness.processElement(3, NO_TIMESTAMP);
+
+        // Fire window [10, 20), value is 3+3=6.
+        testHarness.getProcessingTimeService().setCurrentTime(20);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 6), 19));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+    }
+
     @Test
     public void testMergingWindows() throws Exception {
         ContinuousProcessingTimeTrigger<TimeWindow> trigger =
@@ -149,8 +228,9 @@ public class ContinuousProcessingTimeTriggerTest {
         TestHarnessUtil.assertOutputEquals(
                 "Output mismatch", expectedOutput, testHarness.getOutput());
 
-        // There is no on time firing for now.
+        // Firing on time.
         testHarness.getProcessingTimeService().setCurrentTime(15);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11));
         TestHarnessUtil.assertOutputEquals(
                 "Output mismatch", expectedOutput, testHarness.getOutput());