You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/01 07:21:12 UTC
[flink] 02/02: [FLINK-20443][API/DataStream]
ContinuousEventTimeTrigger optimization
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b163c715ebda99d58c9161023c260f5ba0ea88a
Author: liliwei <hi...@gmail.com>
AuthorDate: Sat Oct 23 17:17:07 2021 +0800
[FLINK-20443][API/DataStream] ContinuousEventTimeTrigger optimization
---
.../triggers/ContinuousEventTimeTrigger.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4ff2d93..8d0dcb8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -59,12 +59,10 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
ctx.registerEventTimeTimer(window.maxTimestamp());
}
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
- if (fireTimestamp.get() == null) {
- long start = timestamp - (timestamp % interval);
- long nextFireTimestamp = start + interval;
- ctx.registerEventTimeTimer(nextFireTimestamp);
- fireTimestamp.add(nextFireTimestamp);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
+ if (fireTimestampState.get() == null) {
+ registerNextFireTimestamp(
+ timestamp - (timestamp % interval), window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
@@ -83,8 +81,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
- fireTimestampState.add(time + interval);
- ctx.registerEventTimeTimer(time + interval);
+ registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
}
@@ -149,4 +146,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
return Math.min(value1, value2);
}
}
+
+ private void registerNextFireTimestamp(
+ long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+ throws Exception {
+ long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+ fireTimestampState.add(nextFireTimestamp);
+ ctx.registerEventTimeTimer(nextFireTimestamp);
+ }
}