You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/16 09:49:29 UTC
flink git commit: [FLINK-4589] [DataStream API] Fix Merging of
Covering Window in MergingWindowSet
Repository: flink
Updated Branches:
refs/heads/release-1.1 f263b9917 -> c5b391c5e
[FLINK-4589] [DataStream API] Fix Merging of Covering Window in MergingWindowSet
This also adds two new test cases for that problem.
This closes #2476
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5b391c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5b391c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5b391c5
Branch: refs/heads/release-1.1
Commit: c5b391c5e3d748c93f5d9f254869214ce426aedf
Parents: f263b99
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 7 13:51:53 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 15 19:32:42 2016 +0200
----------------------------------------------------------------------
.../operators/windowing/MergingWindowSet.java | 9 ++-
.../windowing/MergingWindowSetTest.java | 64 ++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c5b391c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index c806d2d..4e19c31 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -201,7 +201,7 @@ public class MergingWindowSet<W extends Window> {
}
// the new window created a new, self-contained window without merging
- if (resultWindow.equals(newWindow)) {
+ if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
this.windows.put(resultWindow, resultWindow);
}
@@ -225,4 +225,11 @@ public class MergingWindowSet<W extends Window> {
*/
void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
}
+
+ @Override
+ public String toString() {
+ return "MergingWindowSet{" +
+ "windows=" + windows +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c5b391c5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 939f13f..e2cb6c8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -205,6 +205,70 @@ public class MergingWindowSetTest {
assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
}
+ /**
+ * Test merging of a large new window that covers one existing windows.
+ */
+ @Test
+ public void testMergeLargeWindowCoveringSingleWindow() throws Exception {
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+ TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+ // add an initial small window
+
+ mergeFunction.reset();
+ assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction));
+ assertFalse(mergeFunction.hasMerged());
+ assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));
+
+ // add a new window that completely covers the existing window
+
+ mergeFunction.reset();
+ assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction));
+ assertTrue(mergeFunction.hasMerged());
+ assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(0, 3)));
+ }
+
+ /**
+ * Test merging of a large new window that covers multiple existing windows.
+ */
+ @Test
+ public void testMergeLargeWindowCoveringMultipleWindows() throws Exception {
+ MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));
+
+ TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+ // add several non-overlapping initial windoww
+
+ mergeFunction.reset();
+ assertEquals(new TimeWindow(1, 3), windowSet.addWindow(new TimeWindow(1, 3), mergeFunction));
+ assertFalse(mergeFunction.hasMerged());
+ assertEquals(new TimeWindow(1, 3), windowSet.getStateWindow(new TimeWindow(1, 3)));
+
+ mergeFunction.reset();
+ assertEquals(new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction));
+ assertFalse(mergeFunction.hasMerged());
+ assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new TimeWindow(5, 8)));
+
+ mergeFunction.reset();
+ assertEquals(new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction));
+ assertFalse(mergeFunction.hasMerged());
+ assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new TimeWindow(10, 13)));
+
+ // add a new window that completely covers the existing windows
+
+ mergeFunction.reset();
+ assertEquals(new TimeWindow(0, 13), windowSet.addWindow(new TimeWindow(0, 13), mergeFunction));
+ assertTrue(mergeFunction.hasMerged());
+ assertThat(mergeFunction.mergedStateWindows(), anyOf(
+ containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(5, 8)),
+ containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(10, 13)),
+ containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13))));
+ assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
+
+ }
+
+
private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
private TimeWindow target = null;
private Collection<TimeWindow> sources = null;