You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/23 15:30:10 UTC

[3/3] flink git commit: [FLINK-5972] Don't allow shrinking merging windows

[FLINK-5972] Don't allow shrinking merging windows

This closes #3587.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68289b1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68289b1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68289b1a

Branch: refs/heads/master
Commit: 68289b1a52db7157d23085850ec947e78e729f01
Parents: 25d52e4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 21 14:58:45 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 23 23:29:02 2017 +0800

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 13 ++++
 .../operators/windowing/WindowOperator.java     | 15 +++-
 .../windowing/WindowOperatorContractTest.java   | 80 ++++++++++++++++++++
 3 files changed, 107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 951f661..24c8d32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -121,6 +121,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 							public void merge(W mergeResult,
 									Collection<W> mergedWindows, W stateWindowResult,
 									Collection<W> mergedStateWindows) throws Exception {
+
+								if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+									throw new UnsupportedOperationException("The end timestamp of an " +
+											"event-time window cannot become earlier than the current watermark " +
+											"by merging. Current watermark: " + internalTimerService.currentWatermark() +
+											" window: " + mergeResult);
+								} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+									throw new UnsupportedOperationException("The end timestamp of a " +
+											"processing-time window cannot become earlier than the current processing time " +
+											"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+											" window: " + mergeResult);
+								}
+
 								context.key = key;
 								context.window = mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b4283d8..3745659 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -131,7 +131,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 *         {@code window.maxTimestamp + allowedLateness} landmark.
 	 * </ul>
 	 */
-	private final long allowedLateness;
+	protected final long allowedLateness;
 
 	/**
 	 * {@link OutputTag} to use for late arriving events. Elements for which
@@ -352,6 +352,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					public void merge(W mergeResult,
 							Collection<W> mergedWindows, W stateWindowResult,
 							Collection<W> mergedStateWindows) throws Exception {
+
+						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+							throw new UnsupportedOperationException("The end timestamp of an " +
+									"event-time window cannot become earlier than the current watermark " +
+									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
+									" window: " + mergeResult);
+						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+							throw new UnsupportedOperationException("The end timestamp of a " +
+									"processing-time window cannot become earlier than the current processing time " +
+									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+									" window: " + mergeResult);
+						}
+
 						context.key = key;
 						context.window = mergeResult;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index aaea8b1..8aae46a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1479,6 +1479,86 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 	@Test
+	public void testRejectShrinkingMergingEventTimeWindows() throws Exception {
+		testRejectShrinkingMergingWindows(new EventTimeAdaptor());
+	}
+
+	@Test
+	public void testRejectShrinkingMergingProcessingTimeWindows() throws Exception {
+		testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
+	}
+
+	/**
+	 * A misbehaving {@code WindowAssigner} can cause a window to become late by merging if
+	 * it moves the end-of-window time before the watermark. This verifies that we don't allow that.
+	 */
+	void testRejectShrinkingMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+		int allowedLateness = 10;
+
+		if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+			// we don't have allowed lateness for processing time
+			allowedLateness = 0;
+		}
+
+		MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+		timeAdaptor.setIsEventTime(mockAssigner);
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+				createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
+
+		testHarness.open();
+
+		timeAdaptor.advanceTime(testHarness, 0);
+
+		assertEquals(0, testHarness.extractOutputStreamRecords().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(0, 22)));
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+		assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+		timeAdaptor.advanceTime(testHarness, 20);
+
+		// our window should still be there
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+		assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+		// the result timestamp is ... + 2 because a watermark t says no element with
+		// timestamp <= t will come in the future and because window ends are exclusive:
+		// a window (0, 12) will have 11 as maxTimestamp. With the watermark at 20, 10 would
+		// already be considered late
+		shouldMergeWindows(
+				mockAssigner,
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+				new TimeWindow(0, 20 - allowedLateness + 2));
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		// now merge it to a window that is just late
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+				.thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+		shouldMergeWindows(
+				mockAssigner,
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+				new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+				new TimeWindow(0, 20 - allowedLateness + 1));
+
+		expectedException.expect(UnsupportedOperationException.class);
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+	}
+
+	@Test
 	public void testMergingOfExistingEventTimeWindows() throws Exception {
 		testMergingOfExistingWindows(new EventTimeAdaptor());
 	}