You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/07 10:36:12 UTC
flink git commit: [hotfix] Fix EvictingWindowOperator to work with
MergingWindowAssigner
Repository: flink
Updated Branches:
refs/heads/master 02ef68fb4 -> a3415028d
[hotfix] Fix EvictingWindowOperator to work with MergingWindowAssigner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3415028
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3415028
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3415028
Branch: refs/heads/master
Commit: a3415028dd7bec24e170c0d8a3531b19c021aaaf
Parents: 02ef68f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 6 18:12:34 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 7 10:35:24 2016 +0200
----------------------------------------------------------------------
.../windowing/EvictingWindowOperator.java | 138 ++++++++++++++-----
1 file changed, 102 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a3415028/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 54d8f9f..1e4e453 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -24,9 +24,12 @@ import com.google.common.collect.Iterables;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -76,22 +79,86 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
- Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+ Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
+ element.getTimestamp());
K key = (K) getStateBackend().getCurrentKey();
- for (W window: elementWindows) {
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get(getStateBackend().getCurrentKey());
+ if (mergingWindows == null) {
+ mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner);
+ mergingWindowsByKey.put(key, mergingWindows);
+ }
+
+
+ for (W window : elementWindows) {
+ // If there is a merge, it can only result in a window that contains our new
+ // element because we always eagerly merge
+ final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
+
+
+ // adding the new window might result in a merge, in that case the actualWindow
+ // is the merged window and we work with that. If we don't merge then
+ // actualWindow == window
+ W actualWindow = mergingWindows.addWindow(window,
+ new MergingWindowSet.MergeFunction<W>() {
+ @Override
+ public void merge(W mergeResult,
+ Collection<W> mergedWindows, W stateWindowResult,
+ Collection<W> mergedStateWindows) throws Exception {
+ context.window = mergeResult;
+
+ // store for later use
+ mergeTriggerResult.f0 = context.onMerge(mergedWindows);
+
+ for (W m : mergedWindows) {
+ context.window = m;
+ context.clear();
+ }
+
+ // merge the merged state windows into the newly resulting state window
+ getStateBackend().mergePartitionedStates(stateWindowResult,
+ mergedStateWindows,
+ windowSerializer,
+ (StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+ }
+ });
+
+ W stateWindow = mergingWindows.getStateWindow(actualWindow);
+ ListState<StreamRecord<IN>> windowState = getPartitionedState(stateWindow,
+ windowSerializer,
+ windowStateDescriptor);
+ windowState.add(element);
+
+ context.key = key;
+ context.window = actualWindow;
+
+ // we might have already fired because of a merge but still call onElement
+ // on the (possibly merged) window
+ TriggerResult triggerResult = context.onElement(element);
+
+ TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
+ mergeTriggerResult.f0);
+
+ processTriggerResult(combinedTriggerResult, key, actualWindow);
+ }
- ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer,
- windowStateDescriptor);
+ } else {
+ for (W window : elementWindows) {
- windowState.add(element);
+ ListState<StreamRecord<IN>> windowState = getPartitionedState(window,
+ windowSerializer,
+ windowStateDescriptor);
- context.key = key;
- context.window = window;
- TriggerResult triggerResult = context.onElement(element);
+ windowState.add(element);
- processTriggerResult(triggerResult, key, window);
+ context.key = key;
+ context.window = window;
+ TriggerResult triggerResult = context.onElement(element);
+
+ processTriggerResult(triggerResult, key, window);
+ }
}
}
@@ -103,44 +170,43 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
return;
}
- if (triggerResult.isFire()) {
- timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+ ListState<StreamRecord<IN>> windowState;
- setKeyContext(key);
+ MergingWindowSet<W> mergingWindows = null;
- ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer,
- windowStateDescriptor);
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = mergingWindowsByKey.get(key);
+ W stateWindow = mergingWindows.getStateWindow(window);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
+ }
+
+ if (triggerResult.isFire()) {
+ timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
Iterable<StreamRecord<IN>> contents = windowState.get();
// Work around type system restrictions...
int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
FluentIterable<IN> projectedContents = FluentIterable
- .from(contents)
- .skip(toEvict)
- .transform(new Function<StreamRecord<IN>, IN>() {
- @Override
- public IN apply(StreamRecord<IN> input) {
- return input.getValue();
- }
- });
+ .from(contents)
+ .skip(toEvict)
+ .transform(new Function<StreamRecord<IN>, IN>() {
+ @Override
+ public IN apply(StreamRecord<IN> input) {
+ return input.getValue();
+ }
+ });
userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
-
- if (triggerResult.isPurge()) {
- windowState.clear();
- } else {
- // we have to clear the state and set the elements that remain after eviction
- windowState.clear();
- for (StreamRecord<IN> rec: FluentIterable.from(contents).skip(toEvict)) {
- windowState.add(rec);
- }
- }
- } else if (triggerResult.isPurge()) {
- setKeyContext(key);
- ListState<StreamRecord<IN>> windowState = getPartitionedState(window, windowSerializer,
- windowStateDescriptor);
+ }
+ if (triggerResult.isPurge()) {
windowState.clear();
+ if (mergingWindows != null) {
+ mergingWindows.retireWindow(window);
+ }
+ context.clear();
}
}