You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/23 12:26:27 UTC

flink git commit: [hotfix] Make MergingWindowSet resilient to misbehaving window assigners

Repository: flink
Updated Branches:
  refs/heads/master df3f11979 -> 16d33d0dd


[hotfix] Make MergingWindowSet resilient to misbehaving window assigners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16d33d0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16d33d0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16d33d0d

Branch: refs/heads/master
Commit: 16d33d0dd169baf06317ff8f67f14e113d0dcefb
Parents: df3f119
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 4 10:17:55 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 12:18:27 2017 +0100

----------------------------------------------------------------------
 .../operators/windowing/MergingWindowSet.java   |   4 +-
 .../windowing/MergingWindowSetTest.java         | 108 +++++++++++++++++++
 2 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16d33d0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index 06cacad..b79a3fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -171,6 +171,7 @@ public class MergingWindowSet<W extends Window> {
 				});
 
 		W resultWindow = newWindow;
+		boolean mergedNewWindow = false;
 
 		// perform the merge
 		for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
@@ -180,6 +181,7 @@ public class MergingWindowSet<W extends Window> {
 			// if our new window is in the merged windows make the merge result the
 			// result window
 			if (mergedWindows.remove(newWindow)) {
+				mergedNewWindow = true;
 				resultWindow = mergeResult;
 			}
 
@@ -213,7 +215,7 @@ public class MergingWindowSet<W extends Window> {
 		}
 
 		// the new window created a new, self-contained window without merging
-		if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
+		if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
 			this.mapping.put(resultWindow, resultWindow);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/16d33d0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 7c1fa93..46169a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -18,15 +18,25 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -44,6 +54,40 @@ import static org.mockito.Mockito.*;
  */
 public class MergingWindowSetTest {
 
+	/**
+	 * This test uses a special (misbehaving) {@code MergingWindowAssigner} that produces cases
+	 * where windows that don't overlap with the newly added window are being merged. We verify
+	 * that the merging window set is nevertheless correct and contains all added windows.
+	 */
+	@Test
+	public void testNonEagerMerging() throws Exception {
+		@SuppressWarnings("unchecked")
+		ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+		MergingWindowSet<TimeWindow> windowSet =
+				new MergingWindowSet<>(new NonEagerlyMergingWindowAssigner(3000), mockState);
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		TimeWindow result;
+
+		mergeFunction.reset();
+		result = windowSet.addWindow(new TimeWindow(0, 2), mergeFunction);
+		assertNotNull(windowSet.getStateWindow(result));
+
+		mergeFunction.reset();
+		result = windowSet.addWindow(new TimeWindow(2, 5), mergeFunction);
+		assertNotNull(windowSet.getStateWindow(result));
+
+		mergeFunction.reset();
+		result = windowSet.addWindow(new TimeWindow(1, 2), mergeFunction);
+		assertNotNull(windowSet.getStateWindow(result));
+
+		mergeFunction.reset();
+		result = windowSet.addWindow(new TimeWindow(10, 12), mergeFunction);
+		assertNotNull(windowSet.getStateWindow(result));
+	}
+
 	@Test
 	public void testIncrementalMerging() throws Exception {
 		@SuppressWarnings("unchecked")
@@ -392,4 +436,68 @@ public class MergingWindowSetTest {
 			this.sources = mergedWindows;
 		}
 	}
+
+	/**
+	 * A special {@link MergingWindowAssigner} that let's windows get larger which leads to windows
+	 * being merged lazily.
+	 */
+	static class NonEagerlyMergingWindowAssigner extends MergingWindowAssigner<Object, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		protected long sessionTimeout;
+
+		public NonEagerlyMergingWindowAssigner(long sessionTimeout) {
+			this.sessionTimeout = sessionTimeout;
+		}
+
+		@Override
+		public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+			return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+		}
+
+		@Override
+		public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+			return EventTimeTrigger.create();
+		}
+
+		@Override
+		public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+			return new TimeWindow.Serializer();
+		}
+
+		@Override
+		public boolean isEventTime() {
+			return true;
+		}
+
+		/**
+		 * Merge overlapping {@link TimeWindow}s.
+		 */
+		public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
+
+			TimeWindow earliestStart = null;
+
+			for (TimeWindow win : windows) {
+				if (earliestStart == null) {
+					earliestStart = win;
+				} else if (win.getStart() < earliestStart.getStart()) {
+					earliestStart = win;
+				}
+			}
+
+			List<TimeWindow> associatedWindows = new ArrayList<>();
+
+			for (TimeWindow win : windows) {
+				if (win.getStart() < earliestStart.getEnd() && win.getStart() >= earliestStart.getStart()) {
+					associatedWindows.add(win);
+				}
+			}
+
+			TimeWindow target = new TimeWindow(earliestStart.getStart(), earliestStart.getEnd() + 1);
+
+			if (associatedWindows.size() > 1) {
+				c.merge(associatedWindows, target);
+			}
+		}
+	}
 }