You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/01 07:21:12 UTC

[flink] 02/02: [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger optimization

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b163c715ebda99d58c9161023c260f5ba0ea88a
Author: liliwei <hi...@gmail.com>
AuthorDate: Sat Oct 23 17:17:07 2021 +0800

    [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger  optimization
---
 .../triggers/ContinuousEventTimeTrigger.java        | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4ff2d93..8d0dcb8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -59,12 +59,10 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
             ctx.registerEventTimeTimer(window.maxTimestamp());
         }
 
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-        if (fireTimestamp.get() == null) {
-            long start = timestamp - (timestamp % interval);
-            long nextFireTimestamp = start + interval;
-            ctx.registerEventTimeTimer(nextFireTimestamp);
-            fireTimestamp.add(nextFireTimestamp);
+        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
+        if (fireTimestampState.get() == null) {
+            registerNextFireTimestamp(
+                    timestamp - (timestamp % interval), window, ctx, fireTimestampState);
         }
 
         return TriggerResult.CONTINUE;
@@ -83,8 +81,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 
         if (fireTimestamp != null && fireTimestamp == time) {
             fireTimestampState.clear();
-            fireTimestampState.add(time + interval);
-            ctx.registerEventTimeTimer(time + interval);
+            registerNextFireTimestamp(time, window, ctx, fireTimestampState);
             return TriggerResult.FIRE;
         }
 
@@ -149,4 +146,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
             return Math.min(value1, value2);
         }
     }
+
+    private void registerNextFireTimestamp(
+            long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+            throws Exception {
+        long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+        fireTimestampState.add(nextFireTimestamp);
+        ctx.registerEventTimeTimer(nextFireTimestamp);
+    }
 }