You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/16 09:49:29 UTC

flink git commit: [FLINK-4589] [DataStream API] Fix Merging of Covering Window in MergingWindowSet

Repository: flink
Updated Branches:
  refs/heads/release-1.1 f263b9917 -> c5b391c5e


[FLINK-4589] [DataStream API] Fix Merging of Covering Window in MergingWindowSet

This also adds two new test cases for that problem.

This closes #2476


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5b391c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5b391c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5b391c5

Branch: refs/heads/release-1.1
Commit: c5b391c5e3d748c93f5d9f254869214ce426aedf
Parents: f263b99
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 7 13:51:53 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 15 19:32:42 2016 +0200

----------------------------------------------------------------------
 .../operators/windowing/MergingWindowSet.java   |  9 ++-
 .../windowing/MergingWindowSetTest.java         | 64 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5b391c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index c806d2d..4e19c31 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -201,7 +201,7 @@ public class MergingWindowSet<W extends Window> {
 		}
 
 		// the new window created a new, self-contained window without merging
-		if (resultWindow.equals(newWindow)) {
+		if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
 			this.windows.put(resultWindow, resultWindow);
 		}
 
@@ -225,4 +225,11 @@ public class MergingWindowSet<W extends Window> {
 		 */
 		void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
 	}
+
+	@Override
+	public String toString() {
+		return "MergingWindowSet{" +
+				"windows=" + windows +
+				'}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5b391c5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 939f13f..e2cb6c8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -205,6 +205,70 @@ public class MergingWindowSetTest {
 		assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
 	}
 
+	/**
+	 * Test merging of a large new window that covers one existing windows.
+	 */
+	@Test
+	public void testMergeLargeWindowCoveringSingleWindow() throws Exception {
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		// add an initial small window
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
+
+		// add a new window that completely covers the existing window
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(0, 3)));
+	}
+
+	/**
+	 * Test merging of a large new window that covers multiple existing windows.
+	 */
+	@Test
+	public void testMergeLargeWindowCoveringMultipleWindows() throws Exception {
+		MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+		TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+		// add several non-overlapping initial windoww
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(1, 3), windowSet.addWindow(new TimeWindow(1, 3), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(1, 3), windowSet.getStateWindow(new TimeWindow(1, 3)));
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new TimeWindow(5, 8)));
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction));
+		assertFalse(mergeFunction.hasMerged());
+		assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new TimeWindow(10, 13)));
+
+		// add a new window that completely covers the existing windows
+
+		mergeFunction.reset();
+		assertEquals(new TimeWindow(0, 13), windowSet.addWindow(new TimeWindow(0, 13), mergeFunction));
+		assertTrue(mergeFunction.hasMerged());
+		assertThat(mergeFunction.mergedStateWindows(), anyOf(
+				containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(5, 8)),
+				containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(10, 13)),
+				containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13))));
+		assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+
+	}
+
+
 	private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
 		private TimeWindow target = null;
 		private Collection<TimeWindow> sources = null;