You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/07 10:36:12 UTC

flink git commit: [hotfix] Fix EvictingWindowOperator to work with MergingWindowAssigner

Repository: flink
Updated Branches:
  refs/heads/master 02ef68fb4 -> a3415028d


[hotfix] Fix EvictingWindowOperator to work with MergingWindowAssigner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3415028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3415028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3415028

Branch: refs/heads/master
Commit: a3415028dd7bec24e170c0d8a3531b19c021aaaf
Parents: 02ef68f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 6 18:12:34 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 7 10:35:24 2016 +0200

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 138 ++++++++++++++-----
 1 file changed, 102 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3415028/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 54d8f9f..1e4e453 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -24,9 +24,12 @@ import com.google.common.collect.Iterables;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -76,22 +79,86 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@Override
 	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
+				element.getTimestamp());
 
 		K key = (K) getStateBackend().getCurrentKey();
 
-		for (W window: elementWindows) {
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
+			if (mergingWindows == null) {
+				mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
+				mergingWindowsByKey.put(key, mergingWindows);
+			}
+
+
+			for (W window : elementWindows) {
+				// If there is a merge, it can only result in a window that contains our new
+				// element because we always eagerly merge
+				final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
+
+
+				// adding the new window might result in a merge, in that case the actualWindow
+				// is the merged window and we work with that. If we don't merge then
+				// actualWindow == window
+				W actualWindow = mergingWindows.addWindow(window,
+						new MergingWindowSet.MergeFunction<W>() {
+							@Override
+							public void merge(W mergeResult,
+									Collection<W> mergedWindows, W stateWindowResult,
+									Collection<W> mergedStateWindows) throws Exception {
+								context.window = mergeResult;
+
+								// store for later use
+								mergeTriggerResult.f0 = context.onMerge(mergedWindows);
+
+								for (W m : mergedWindows) {
+									context.window = m;
+									context.clear();
+								}
+
+								// merge the merged state windows into the newly resulting state window
+								getStateBackend().mergePartitionedStates(stateWindowResult,
+										mergedStateWindows,
+										windowSerializer,
+										(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+							}
+						});
+
+				W stateWindow = mergingWindows.getStateWindow(actualWindow);
+				ListState<StreamRecord<IN>> windowState = getPartitionedState(stateWindow,
+						windowSerializer,
+						windowStateDescriptor);
+				windowState.add(element);
+
+				context.key = key;
+				context.window = actualWindow;
+
+				// we might have already fired because of a merge but still call onElement
+				// on the (possibly merged) window
+				TriggerResult triggerResult = context.onElement(element);
+
+				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
+						mergeTriggerResult.f0);
+
+				processTriggerResult(combinedTriggerResult, key, actualWindow);
+			}
 
-			ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer,
-				windowStateDescriptor);
+		} else {
+			for (W window : elementWindows) {
 
-			windowState.add(element);
+				ListState<StreamRecord<IN>> windowState = getPartitionedState(window,
+						windowSerializer,
+						windowStateDescriptor);
 
-			context.key = key;
-			context.window = window;
-			TriggerResult triggerResult = context.onElement(element);
+				windowState.add(element);
 
-			processTriggerResult(triggerResult, key, window);
+				context.key = key;
+				context.window = window;
+				TriggerResult triggerResult = context.onElement(element);
+
+				processTriggerResult(triggerResult, key, window);
+			}
 		}
 	}
 
@@ -103,44 +170,43 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 			return;
 		}
 
-		if (triggerResult.isFire()) {
-			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+		ListState<StreamRecord<IN>> windowState;
 
-			setKeyContext(key);
+		MergingWindowSet<W> mergingWindows = null;
 
-			ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer,
-				windowStateDescriptor);
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindows = mergingWindowsByKey.get(key);
+			W stateWindow = mergingWindows.getStateWindow(window);
+			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
 
+		} else {
+			windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
+		}
+
+		if (triggerResult.isFire()) {
+			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 			Iterable<StreamRecord<IN>> contents = windowState.get();
 
 			// Work around type system restrictions...
 			int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
 
 			FluentIterable<IN> projectedContents = FluentIterable
-				.from(contents)
-				.skip(toEvict)
-				.transform(new Function<StreamRecord<IN>, IN>() {
-					@Override
-					public IN apply(StreamRecord<IN> input) {
-						return input.getValue();
-					}
-				});
+					.from(contents)
+					.skip(toEvict)
+					.transform(new Function<StreamRecord<IN>, IN>() {
+						@Override
+						public IN apply(StreamRecord<IN> input) {
+							return input.getValue();
+						}
+					});
 			userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
-
-			if (triggerResult.isPurge()) {
-				windowState.clear();
-			} else {
-				// we have to clear the state and set the elements that remain after eviction
-				windowState.clear();
-				for (StreamRecord<IN> rec: FluentIterable.from(contents).skip(toEvict)) {
-					windowState.add(rec);
-				}
-			}
-		} else if (triggerResult.isPurge()) {
-			setKeyContext(key);
-			ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer,
-				windowStateDescriptor);
+		}
+		if (triggerResult.isPurge()) {
 			windowState.clear();
+			if (mergingWindows != null) {
+				mergingWindows.retireWindow(window);
+			}
+			context.clear();
 		}
 	}