You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/23 12:26:27 UTC
flink git commit: [hotfix] Make MergingWindowSet resilient to
misbehaving window assigners
Repository: flink
Updated Branches:
refs/heads/master df3f11979 -> 16d33d0dd
[hotfix] Make MergingWindowSet resilient to misbehaving window assigners
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16d33d0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16d33d0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16d33d0d
Branch: refs/heads/master
Commit: 16d33d0dd169baf06317ff8f67f14e113d0dcefb
Parents: df3f119
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 4 10:17:55 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 12:18:27 2017 +0100
----------------------------------------------------------------------
.../operators/windowing/MergingWindowSet.java | 4 +-
.../windowing/MergingWindowSetTest.java | 108 +++++++++++++++++++
2 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/16d33d0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index 06cacad..b79a3fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -171,6 +171,7 @@ public class MergingWindowSet<W extends Window> {
});
W resultWindow = newWindow;
+ boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
@@ -180,6 +181,7 @@ public class MergingWindowSet<W extends Window> {
// if our new window is in the merged windows make the merge result the
// result window
if (mergedWindows.remove(newWindow)) {
+ mergedNewWindow = true;
resultWindow = mergeResult;
}
@@ -213,7 +215,7 @@ public class MergingWindowSet<W extends Window> {
}
// the new window created a new, self-contained window without merging
- if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
+ if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/16d33d0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
index 7c1fa93..46169a8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java
@@ -18,15 +18,25 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.junit.Test;
import org.mockito.Matchers;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.hasItem;
@@ -44,6 +54,40 @@ import static org.mockito.Mockito.*;
*/
public class MergingWindowSetTest {
+ /**
+ * This test uses a special (misbehaving) {@code MergingWindowAssigner} that produces cases
+ * where windows that don't overlap with the newly added window are being merged. We verify
+ * that the merging window set is nevertheless correct and contains all added windows.
+ */
+ @Test
+ public void testNonEagerMerging() throws Exception {
+ @SuppressWarnings("unchecked")
+ ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class);
+
+ MergingWindowSet<TimeWindow> windowSet =
+ new MergingWindowSet<>(new NonEagerlyMergingWindowAssigner(3000), mockState);
+
+ TestingMergeFunction mergeFunction = new TestingMergeFunction();
+
+ TimeWindow result;
+
+ mergeFunction.reset();
+ result = windowSet.addWindow(new TimeWindow(0, 2), mergeFunction);
+ assertNotNull(windowSet.getStateWindow(result));
+
+ mergeFunction.reset();
+ result = windowSet.addWindow(new TimeWindow(2, 5), mergeFunction);
+ assertNotNull(windowSet.getStateWindow(result));
+
+ mergeFunction.reset();
+ result = windowSet.addWindow(new TimeWindow(1, 2), mergeFunction);
+ assertNotNull(windowSet.getStateWindow(result));
+
+ mergeFunction.reset();
+ result = windowSet.addWindow(new TimeWindow(10, 12), mergeFunction);
+ assertNotNull(windowSet.getStateWindow(result));
+ }
+
@Test
public void testIncrementalMerging() throws Exception {
@SuppressWarnings("unchecked")
@@ -392,4 +436,68 @@ public class MergingWindowSetTest {
this.sources = mergedWindows;
}
}
+
+ /**
+ * A special {@link MergingWindowAssigner} that let's windows get larger which leads to windows
+ * being merged lazily.
+ */
+ static class NonEagerlyMergingWindowAssigner extends MergingWindowAssigner<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ protected long sessionTimeout;
+
+ public NonEagerlyMergingWindowAssigner(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ @Override
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+ return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+ return EventTimeTrigger.create();
+ }
+
+ @Override
+ public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+ return new TimeWindow.Serializer();
+ }
+
+ @Override
+ public boolean isEventTime() {
+ return true;
+ }
+
+ /**
+ * Merge overlapping {@link TimeWindow}s.
+ */
+ public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
+
+ TimeWindow earliestStart = null;
+
+ for (TimeWindow win : windows) {
+ if (earliestStart == null) {
+ earliestStart = win;
+ } else if (win.getStart() < earliestStart.getStart()) {
+ earliestStart = win;
+ }
+ }
+
+ List<TimeWindow> associatedWindows = new ArrayList<>();
+
+ for (TimeWindow win : windows) {
+ if (win.getStart() < earliestStart.getEnd() && win.getStart() >= earliestStart.getStart()) {
+ associatedWindows.add(win);
+ }
+ }
+
+ TimeWindow target = new TimeWindow(earliestStart.getStart(), earliestStart.getEnd() + 1);
+
+ if (associatedWindows.size() > 1) {
+ c.merge(associatedWindows, target);
+ }
+ }
+ }
}