You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/05/14 08:10:20 UTC

[1/2] flink git commit: [FLINK-9201] Fixed the same merge window will be fired twice if watermark already passed the merge window

Repository: flink
Updated Branches:
  refs/heads/master f057ca9d9 -> 64f32f929


[FLINK-9201] Fixed the same merge window will be fired twice if watermark already passed the merge window


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3afb7b35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3afb7b35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3afb7b35

Branch: refs/heads/master
Commit: 3afb7b3512353e28cd2fb7f49cd88c8caaa0da0c
Parents: f057ca9
Author: yuemeng <hz...@corp.netease.com>
Authored: Thu Apr 26 16:55:05 2018 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 14 10:09:30 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/windowing/triggers/EventTimeTrigger.java   | 8 +++++++-
 .../api/windowing/triggers/ProcessingTimeTrigger.java        | 8 +++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3afb7b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 2f8f16f..2066bba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -69,7 +69,13 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
 	@Override
 	public void onMerge(TimeWindow window,
 			OnMergeContext ctx) {
-		ctx.registerEventTimeTimer(window.maxTimestamp());
+		// only register a timer if the watermark is not yet past the end of the merged window
+		// this is in line with the logic in onElement(). If the watermark is past the end of
+		// the window onElement() will fire and setting a timer here would fire the window twice.
+		long windowMaxTimestamp = window.maxTimestamp();
+		if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
+			ctx.registerEventTimeTimer(windowMaxTimestamp);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3afb7b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index cd7869e..c8e6e6c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -60,7 +60,13 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
 	@Override
 	public void onMerge(TimeWindow window,
 			OnMergeContext ctx) {
-		ctx.registerProcessingTimeTimer(window.maxTimestamp());
+		// only register a timer if the time is not yet past the end of the merged window
+		// this is in line with the logic in onElement(). If the time is past the end of
+		// the window onElement() will fire and setting a timer here would fire the window twice.
+		long windowMaxTimestamp = window.maxTimestamp();
+		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
+			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
+		}
 	}
 
 	@Override


[2/2] flink git commit: [FLINK-9201] Add trigger tests for late-window merging

Posted by al...@apache.org.
[FLINK-9201] Add trigger tests for late-window merging

This closes #5917


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64f32f92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64f32f92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64f32f92

Branch: refs/heads/master
Commit: 64f32f9299bfdb7e7b1fcd759618c37638a6a5b7
Parents: 3afb7b3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon May 14 10:08:11 2018 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon May 14 10:09:56 2018 +0200

----------------------------------------------------------------------
 .../windowing/EventTimeTriggerTest.java         | 39 ++++++++++++++++++++
 .../windowing/ProcessingTimeTriggerTest.java    | 38 +++++++++++++++++++
 2 files changed, 77 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64f32f92/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
index f54367b..eb14561 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
@@ -153,4 +153,43 @@ public class EventTimeTriggerTest {
 		assertEquals(0, testHarness.numProcessingTimeTimers());
 		assertEquals(0, testHarness.numEventTimeTimers());
 	}
+
+	/**
+	 * Merging a late window should not register a timer, otherwise we would get two firings:
+	 * one from onElement() on the merged window and one from the timer.
+	 */
+	@Test
+	public void testMergingLateWindows() throws Exception {
+
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+			new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+		assertTrue(EventTimeTrigger.create().canMerge());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.advanceWatermark(10);
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+		assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 4)));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64f32f92/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
index 7e78854..7336c88 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java
@@ -134,4 +134,42 @@ public class ProcessingTimeTriggerTest {
 		assertEquals(0, testHarness.numProcessingTimeTimers());
 		assertEquals(0, testHarness.numEventTimeTimers());
 	}
+
+	/**
+	 * Merging a late window should not register a timer, otherwise we would get two firings:
+	 * one from onElement() on the merged window and one from the timer.
+	 */
+	@Test
+	public void testMergingLateWindows() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+			new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer());
+
+		assertTrue(ProcessingTimeTrigger.create().canMerge());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.advanceProcessingTime(10);
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numEventTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4)));
+		assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 4)));
+	}
 }