You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/01 07:21:10 UTC
[flink] branch master updated (34d581b -> 3b163c7)
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 34d581b [FLINK-20895][table-planner] Support local aggregate push down in table planner
new a7e1526 [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window
new 3b163c7 [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger optimization
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../triggers/ContinuousEventTimeTrigger.java | 21 +++---
.../triggers/ContinuousProcessingTimeTrigger.java | 30 ++++----
.../ContinuousProcessingTimeTriggerTest.java | 82 +++++++++++++++++++++-
3 files changed, 110 insertions(+), 23 deletions(-)
[flink] 02/02: [FLINK-20443][API/DataStream]
ContinuousEventTimeTrigger optimization
Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b163c715ebda99d58c9161023c260f5ba0ea88a
Author: liliwei <hi...@gmail.com>
AuthorDate: Sat Oct 23 17:17:07 2021 +0800
[FLINK-20443][API/DataStream] ContinuousEventTimeTrigger optimization
---
.../triggers/ContinuousEventTimeTrigger.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4ff2d93..8d0dcb8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -59,12 +59,10 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
ctx.registerEventTimeTimer(window.maxTimestamp());
}
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
- if (fireTimestamp.get() == null) {
- long start = timestamp - (timestamp % interval);
- long nextFireTimestamp = start + interval;
- ctx.registerEventTimeTimer(nextFireTimestamp);
- fireTimestamp.add(nextFireTimestamp);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
+ if (fireTimestampState.get() == null) {
+ registerNextFireTimestamp(
+ timestamp - (timestamp % interval), window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
@@ -83,8 +81,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
- fireTimestampState.add(time + interval);
- ctx.registerEventTimeTimer(time + interval);
+ registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
}
@@ -149,4 +146,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
return Math.min(value1, value2);
}
}
+
+ private void registerNextFireTimestamp(
+ long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+ throws Exception {
+ long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+ fireTimestampState.add(nextFireTimestamp);
+ ctx.registerEventTimeTimer(nextFireTimestamp);
+ }
}
[flink] 01/02: [FLINK-20443][API/DataStream]
ContinuousProcessingTimeTrigger doesn't fire at the end of the window
Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a7e15262cdf33d9587b2cb898ca7384fa5903824
Author: liliwei <hi...@gmail.com>
AuthorDate: Thu Sep 2 03:16:54 2021 +0800
[FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window
---
.../triggers/ContinuousProcessingTimeTrigger.java | 30 ++++----
.../ContinuousProcessingTimeTriggerTest.java | 82 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 15 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index c437ba7..e3b1325 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -50,18 +50,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
throws Exception {
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
- if (fireTimestamp.get() == null) {
- long start = timestamp - (timestamp % interval);
- long nextFireTimestamp = start + interval;
-
- ctx.registerProcessingTimeTimer(nextFireTimestamp);
-
- fireTimestamp.add(nextFireTimestamp);
- return TriggerResult.CONTINUE;
+ if (fireTimestampState.get() == null) {
+ registerNextFireTimestamp(
+ timestamp - (timestamp % interval), window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
}
@@ -74,12 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception {
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
- if (fireTimestamp.get().equals(time)) {
- fireTimestamp.clear();
- fireTimestamp.add(time + interval);
- ctx.registerProcessingTimeTimer(time + interval);
+ if (fireTimestampState.get().equals(time)) {
+ fireTimestampState.clear();
+ registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
@@ -141,4 +135,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
return Math.min(value1, value2);
}
}
+
+ private void registerNextFireTimestamp(
+ long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+ throws Exception {
+ long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+ fireTimestampState.add(nextFireTimestamp);
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
index a50ee92..e0b58ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -93,6 +94,84 @@ public class ContinuousProcessingTimeTriggerTest {
}
}
+ /** Verify ContinuousProcessingTimeTrigger fire. */
+ @Test
+ public void testWindowFiring() throws Exception {
+ ContinuousProcessingTimeTrigger<TimeWindow> trigger =
+ ContinuousProcessingTimeTrigger.of(Time.milliseconds(5));
+
+ assertTrue(trigger.canMerge());
+
+ ListStateDescriptor<Integer> stateDesc =
+ new ListStateDescriptor<>(
+ "window-contents",
+ BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, TimeWindow> operator =
+ new WindowOperator<>(
+ TumblingProcessingTimeWindows.of(Time.milliseconds(10)),
+ new TimeWindow.Serializer(),
+ new NullByteKeySelector<>(),
+ BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new IntegerSumWindowFunction()),
+ trigger,
+ 0,
+ null);
+
+ KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, operator.getKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO);
+
+ ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+ testHarness.open();
+
+ // window [0, 10)
+ testHarness.getProcessingTimeService().setCurrentTime(0);
+ testHarness.processElement(1, NO_TIMESTAMP);
+
+ // window [0, 10)
+ testHarness.getProcessingTimeService().setCurrentTime(2);
+ testHarness.processElement(2, NO_TIMESTAMP);
+
+ // Fire window [0, 10), value is 1+2=3.
+ testHarness.getProcessingTimeService().setCurrentTime(5);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 3), 9));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+
+ // window [0, 10)
+ testHarness.getProcessingTimeService().setCurrentTime(7);
+ testHarness.processElement(3, NO_TIMESTAMP);
+
+ // Fire window [0, 10), value is 3+3=6.
+ testHarness.getProcessingTimeService().setCurrentTime(9);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 6), 9));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+
+ // window [10, 20)
+ testHarness.getProcessingTimeService().setCurrentTime(10);
+ testHarness.processElement(3, NO_TIMESTAMP);
+
+ // Fire window [10, 20), value is 3.
+ testHarness.getProcessingTimeService().setCurrentTime(15);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 3), 19));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+
+ // window [10, 20)
+ testHarness.getProcessingTimeService().setCurrentTime(18);
+ testHarness.processElement(3, NO_TIMESTAMP);
+
+ // Fire window [10, 20), value is 3+3=6.
+ testHarness.getProcessingTimeService().setCurrentTime(20);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 6), 19));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+ }
+
@Test
public void testMergingWindows() throws Exception {
ContinuousProcessingTimeTrigger<TimeWindow> trigger =
@@ -149,8 +228,9 @@ public class ContinuousProcessingTimeTriggerTest {
TestHarnessUtil.assertOutputEquals(
"Output mismatch", expectedOutput, testHarness.getOutput());
- // There is no on time firing for now.
+ // Firing on time.
testHarness.getProcessingTimeService().setCurrentTime(15);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11));
TestHarnessUtil.assertOutputEquals(
"Output mismatch", expectedOutput, testHarness.getOutput());