You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/01 07:21:11 UTC
[flink] 01/02: [FLINK-20443][API/DataStream]
ContinuousProcessingTimeTrigger doesn't fire at the end of the window
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a7e15262cdf33d9587b2cb898ca7384fa5903824
Author: liliwei <hi...@gmail.com>
AuthorDate: Thu Sep 2 03:16:54 2021 +0800
[FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window
---
.../triggers/ContinuousProcessingTimeTrigger.java | 30 ++++----
.../ContinuousProcessingTimeTriggerTest.java | 82 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 15 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index c437ba7..e3b1325 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -50,18 +50,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
throws Exception {
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
- if (fireTimestamp.get() == null) {
- long start = timestamp - (timestamp % interval);
- long nextFireTimestamp = start + interval;
-
- ctx.registerProcessingTimeTimer(nextFireTimestamp);
-
- fireTimestamp.add(nextFireTimestamp);
- return TriggerResult.CONTINUE;
+ if (fireTimestampState.get() == null) {
+ registerNextFireTimestamp(
+ timestamp - (timestamp % interval), window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
}
@@ -74,12 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception {
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
- if (fireTimestamp.get().equals(time)) {
- fireTimestamp.clear();
- fireTimestamp.add(time + interval);
- ctx.registerProcessingTimeTimer(time + interval);
+ if (fireTimestampState.get().equals(time)) {
+ fireTimestampState.clear();
+ registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
@@ -141,4 +135,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
return Math.min(value1, value2);
}
}
+
+ private void registerNextFireTimestamp(
+ long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
+ throws Exception {
+ long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
+ fireTimestampState.add(nextFireTimestamp);
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
index a50ee92..e0b58ea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -93,6 +94,84 @@ public class ContinuousProcessingTimeTriggerTest {
}
}
+ /** Verify ContinuousProcessingTimeTrigger fire. */
+ @Test
+ public void testWindowFiring() throws Exception {
+ ContinuousProcessingTimeTrigger<TimeWindow> trigger =
+ ContinuousProcessingTimeTrigger.of(Time.milliseconds(5));
+
+ assertTrue(trigger.canMerge());
+
+ ListStateDescriptor<Integer> stateDesc =
+ new ListStateDescriptor<>(
+ "window-contents",
+ BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, TimeWindow> operator =
+ new WindowOperator<>(
+ TumblingProcessingTimeWindows.of(Time.milliseconds(10)),
+ new TimeWindow.Serializer(),
+ new NullByteKeySelector<>(),
+ BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new IntegerSumWindowFunction()),
+ trigger,
+ 0,
+ null);
+
+ KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, operator.getKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO);
+
+ ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+ testHarness.open();
+
+ // window [0, 10)
+ testHarness.getProcessingTimeService().setCurrentTime(0);
+ testHarness.processElement(1, NO_TIMESTAMP);
+
+ // window [0, 10)
+ testHarness.getProcessingTimeService().setCurrentTime(2);
+ testHarness.processElement(2, NO_TIMESTAMP);
+
+ // Fire window [0, 10), value is 1+2=3.
+ testHarness.getProcessingTimeService().setCurrentTime(5);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 3), 9));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+
+ // window [0, 10)
+ testHarness.getProcessingTimeService().setCurrentTime(7);
+ testHarness.processElement(3, NO_TIMESTAMP);
+
+ // Fire window [0, 10), value is 3+3=6.
+ testHarness.getProcessingTimeService().setCurrentTime(9);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 6), 9));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+
+ // window [10, 20)
+ testHarness.getProcessingTimeService().setCurrentTime(10);
+ testHarness.processElement(3, NO_TIMESTAMP);
+
+ // Fire window [10, 20), value is 3.
+ testHarness.getProcessingTimeService().setCurrentTime(15);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 3), 19));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+
+ // window [10, 20)
+ testHarness.getProcessingTimeService().setCurrentTime(18);
+ testHarness.processElement(3, NO_TIMESTAMP);
+
+ // Fire window [10, 20), value is 3+3=6.
+ testHarness.getProcessingTimeService().setCurrentTime(20);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 6), 19));
+ TestHarnessUtil.assertOutputEquals(
+ "Output mismatch", expectedOutput, testHarness.getOutput());
+ }
+
@Test
public void testMergingWindows() throws Exception {
ContinuousProcessingTimeTrigger<TimeWindow> trigger =
@@ -149,8 +228,9 @@ public class ContinuousProcessingTimeTriggerTest {
TestHarnessUtil.assertOutputEquals(
"Output mismatch", expectedOutput, testHarness.getOutput());
- // There is no on time firing for now.
+ // Firing on time.
testHarness.getProcessingTimeService().setCurrentTime(15);
+ expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11));
TestHarnessUtil.assertOutputEquals(
"Output mismatch", expectedOutput, testHarness.getOutput());