You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:24 UTC
[13/50] [abbrv] flink git commit: [FLINK-5972] Don't allow shrinking
merging windows
[FLINK-5972] Don't allow shrinking merging windows
This closes #3587.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68289b1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68289b1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68289b1a
Branch: refs/heads/table-retraction
Commit: 68289b1a52db7157d23085850ec947e78e729f01
Parents: 25d52e4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Mar 21 14:58:45 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 23 23:29:02 2017 +0800
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 13 ++++
.../operators/windowing/WindowOperator.java | 15 +++-
.../windowing/WindowOperatorContractTest.java | 80 ++++++++++++++++++++
3 files changed, 107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 951f661..24c8d32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -121,6 +121,19 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
+
+ if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+ throw new UnsupportedOperationException("The end timestamp of an " +
+ "event-time window cannot become earlier than the current watermark " +
+ "by merging. Current watermark: " + internalTimerService.currentWatermark() +
+ " window: " + mergeResult);
+ } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+ throw new UnsupportedOperationException("The end timestamp of a " +
+ "processing-time window cannot become earlier than the current processing time " +
+ "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+ " window: " + mergeResult);
+ }
+
context.key = key;
context.window = mergeResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b4283d8..3745659 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -131,7 +131,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@code window.maxTimestamp + allowedLateness} landmark.
* </ul>
*/
- private final long allowedLateness;
+ protected final long allowedLateness;
/**
* {@link OutputTag} to use for late arriving events. Elements for which
@@ -352,6 +352,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
+
+ if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
+ throw new UnsupportedOperationException("The end timestamp of an " +
+ "event-time window cannot become earlier than the current watermark " +
+ "by merging. Current watermark: " + internalTimerService.currentWatermark() +
+ " window: " + mergeResult);
+ } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
+ throw new UnsupportedOperationException("The end timestamp of a " +
+ "processing-time window cannot become earlier than the current processing time " +
+ "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
+ " window: " + mergeResult);
+ }
+
context.key = key;
context.window = mergeResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/68289b1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index aaea8b1..8aae46a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1479,6 +1479,86 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
@Test
+ public void testRejectShrinkingMergingEventTimeWindows() throws Exception {
+ testRejectShrinkingMergingWindows(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testRejectShrinkingMergingProcessingTimeWindows() throws Exception {
+ testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
+ }
+
+ /**
+ * A misbehaving {@code WindowAssigner} can cause a window to become late by merging if
+ * it moves the end-of-window time before the watermark. This verifies that we don't allow that.
+ */
+ void testRejectShrinkingMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
+ int allowedLateness = 10;
+
+ if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+ // we don't have allowed lateness for processing time
+ allowedLateness = 0;
+ }
+
+ MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
+
+ testHarness.open();
+
+ timeAdaptor.advanceTime(testHarness, 0);
+
+ assertEquals(0, testHarness.extractOutputStreamRecords().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 22)));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+ timeAdaptor.advanceTime(testHarness, 20);
+
+ // our window should still be there
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // cleanup timer
+
+ // the result timestamp is ... + 2 because a watermark t says no element with
+ // timestamp <= t will come in the future and because window ends are exclusive:
+ // a window (0, 12) will have 11 as maxTimestamp. With the watermark at 20, 10 would
+ // already be considered late
+ shouldMergeWindows(
+ mockAssigner,
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 22), new TimeWindow(0, 25))),
+ new TimeWindow(0, 20 - allowedLateness + 2));
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ // now merge it to a window that is just late
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 25)));
+
+ shouldMergeWindows(
+ mockAssigner,
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+ new ArrayList<>(Arrays.asList(new TimeWindow(0, 20 - allowedLateness + 2), new TimeWindow(0, 25))),
+ new TimeWindow(0, 20 - allowedLateness + 1));
+
+ expectedException.expect(UnsupportedOperationException.class);
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+ }
+
+ @Test
public void testMergingOfExistingEventTimeWindows() throws Exception {
testMergingOfExistingWindows(new EventTimeAdaptor());
}