You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 12:08:43 UTC

flink git commit: [FLINK-4993] Don't Allow Trigger.onMerge() to return TriggerResult

Repository: flink
Updated Branches:
  refs/heads/master e2b93f69c -> 1875cac03


[FLINK-4993] Don't Allow Trigger.onMerge() to return TriggerResult

Allowing Trigger.onMerge() to return a TriggerResult is not necessary
since an onMerge() call will always be followed by an onElement() call
when adding the element that caused the merging to the merged window.
Having this complicates the internal logic of the WindowOperator and
makes writing Triggers more confusing than it has to be.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1875cac0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1875cac0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1875cac0

Branch: refs/heads/master
Commit: 1875cac03042dad4a4c47b0de8364f02fbe457c6
Parents: e2b93f6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Nov 2 11:06:01 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 12:48:11 2016 +0100

----------------------------------------------------------------------
 .../api/windowing/assigners/GlobalWindows.java  |  4 +--
 .../triggers/ContinuousEventTimeTrigger.java    |  3 +--
 .../ContinuousProcessingTimeTrigger.java        |  3 +--
 .../api/windowing/triggers/CountTrigger.java    |  7 +-----
 .../windowing/triggers/EventTimeTrigger.java    |  5 ++--
 .../triggers/ProcessingTimeTrigger.java         |  3 +--
 .../api/windowing/triggers/PurgingTrigger.java  |  5 ++--
 .../api/windowing/triggers/Trigger.java         |  2 +-
 .../api/windowing/triggers/TriggerResult.java   | 26 --------------------
 .../windowing/EvictingWindowOperator.java       | 15 +++--------
 .../operators/windowing/WindowOperator.java     | 18 ++++----------
 .../operators/windowing/WindowOperatorTest.java |  6 ++---
 12 files changed, 21 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 1c6284a..7ea3158 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -92,9 +92,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
 
 		@Override
-		public TriggerResult onMerge(GlobalWindow window,
-				OnMergeContext ctx) {
-			return TriggerResult.CONTINUE;
+		public void onMerge(GlobalWindow window, OnMergeContext ctx) {
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 02c2a42..f3b3e4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -109,13 +109,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 	}
 
 	@Override
-	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
+	public void onMerge(W window, OnMergeContext ctx) throws Exception {
 		ctx.mergePartitionedState(stateDesc);
 		Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
 		if (nextFireTimestamp != null) {
 			ctx.registerEventTimeTimer(nextFireTimestamp);
 		}
-		return TriggerResult.CONTINUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 287d3df..18c9edb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -97,10 +97,9 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 	}
 
 	@Override
-	public TriggerResult onMerge(W window,
+	public void onMerge(W window,
 			OnMergeContext ctx) {
 		ctx.mergePartitionedState(stateDesc);
-		return TriggerResult.CONTINUE;
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 86c5c4c..ffe74b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -76,13 +76,8 @@ public class CountTrigger<W extends Window> extends Trigger<Object, W> {
 	}
 
 	@Override
-	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
+	public void onMerge(W window, OnMergeContext ctx) throws Exception {
 		ctx.mergePartitionedState(stateDesc);
-		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
-		if (count.get() >= maxCount) {
-			return TriggerResult.FIRE;
-		}
-		return TriggerResult.CONTINUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index da14ffd..ae25e87 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -67,10 +67,9 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onMerge(TimeWindow window,
+	public void onMerge(TimeWindow window,
 			OnMergeContext ctx) {
 		ctx.registerEventTimeTimer(window.maxTimestamp());
-		return TriggerResult.CONTINUE;
 	}
 
 	@Override
@@ -88,4 +87,6 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
 	public static EventTimeTrigger create() {
 		return new EventTimeTrigger();
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index a010286..cd7869e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -58,10 +58,9 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onMerge(TimeWindow window,
+	public void onMerge(TimeWindow window,
 			OnMergeContext ctx) {
 		ctx.registerProcessingTimeTimer(window.maxTimestamp());
-		return TriggerResult.CONTINUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index f02d1db..ed1d2fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -70,9 +70,8 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx);
-		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
+	public void onMerge(W window, OnMergeContext ctx) throws Exception {
+		nestedTrigger.onMerge(window, ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 3f68e78..a0209aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -104,7 +104,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	 * @param window The new window that results from the merge.
 	 * @param ctx A context object that can be used to register timer callbacks and access state.
 	 */
-	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
+	public void onMerge(W window, OnMergeContext ctx) throws Exception {
 		throw new RuntimeException("This trigger does not support merging.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
index 2841542..cb4c01b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java
@@ -67,30 +67,4 @@ public enum TriggerResult {
 	public boolean isPurge() {
 		return purge;
 	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Merges two {@code TriggerResults}. This specifies what should happen if we have
-	 * two results from a Trigger, for example as a result from
-	 * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and
-	 * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}.
-	 *
-	 * <p>
-	 * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
-	 * then {@code FIRE} is the combined result;
-	 */
-	public static TriggerResult merge(TriggerResult a, TriggerResult b) {
-		if (a.purge || b.purge) {
-			if (a.fire || b.fire) {
-				return FIRE_AND_PURGE;
-			} else {
-				return PURGE;
-			}
-		} else if (a.fire || b.fire) {
-			return FIRE;
-		} else {
-			return CONTINUE;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 150f46e..8c73878 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
@@ -98,10 +97,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 			MergingWindowSet<W> mergingWindows = getMergingWindowSet();
 
 			for (W window : elementWindows) {
-				// If there is a merge, it can only result in a window that contains our new
-				// element because we always eagerly merge
-				final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
-
 
 				// adding the new window might result in a merge, in that case the actualWindow
 				// is the merged window and we work with that. If we don't merge then
@@ -115,8 +110,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 								context.key = key;
 								context.window = mergeResult;
 
-								// store for later use
-								mergeTriggerResult.f0 = context.onMerge(mergedWindows);
+								context.onMerge(mergedWindows);
 
 								for (W m : mergedWindows) {
 									context.window = m;
@@ -152,12 +146,9 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				evictorContext.key = key;
 				evictorContext.window = actualWindow;
 
-				// we might have already fired because of a merge but still call onElement
-				// on the (possibly merged) window
 				TriggerResult triggerResult = context.onElement(element);
-				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
 
-				if (combinedTriggerResult.isFire()) {
+				if (triggerResult.isFire()) {
 					Iterable<StreamRecord<IN>> contents = windowState.get();
 					if (contents == null) {
 						// if we have no state, there is nothing to do
@@ -166,7 +157,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 					fire(actualWindow, contents, windowState);
 				}
 
-				if (combinedTriggerResult.isPurge()) {
+				if (triggerResult.isPurge()) {
 					cleanup(actualWindow, windowState, mergingWindows);
 				} else {
 					registerCleanupTimer(actualWindow);

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 0ead14a..edcd833 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
@@ -227,9 +226,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			MergingWindowSet<W> mergingWindows = getMergingWindowSet();
 
 			for (W window: elementWindows) {
-				// If there is a merge, it can only result in a window that contains our new
-				// element because we always eagerly merge
-				final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
 
 				// adding the new window might result in a merge, in that case the actualWindow
 				// is the merged window and we work with that. If we don't merge then
@@ -242,8 +238,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 						context.key = key;
 						context.window = mergeResult;
 
-						// store for later use
-						mergeTriggerResult.f0 = context.onMerge(mergedWindows);
+						context.onMerge(mergedWindows);
 
 						for (W m: mergedWindows) {
 							context.window = m;
@@ -278,12 +273,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.key = key;
 				context.window = actualWindow;
 
-				// we might have already fired because of a merge but still call onElement
-				// on the (possibly merged) window
 				TriggerResult triggerResult = context.onElement(element);
-				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
 
-				if (combinedTriggerResult.isFire()) {
+				if (triggerResult.isFire()) {
 					ACC contents = windowState.get();
 					if (contents == null) {
 						continue;
@@ -291,7 +283,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 					fire(actualWindow, contents);
 				}
 
-				if (combinedTriggerResult.isPurge()) {
+				if (triggerResult.isPurge()) {
 					cleanup(actualWindow, windowState, mergingWindows);
 				} else {
 					registerCleanupTimer(actualWindow);
@@ -642,9 +634,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			return trigger.onEventTime(time, window, this);
 		}
 
-		public TriggerResult onMerge(Collection<W> mergedWindows) throws Exception {
+		public void onMerge(Collection<W> mergedWindows) throws Exception {
 			this.mergedWindows = mergedWindows;
-			return trigger.onMerge(window, this);
+			trigger.onMerge(window, this);
 		}
 
 		public void clear() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/1875cac0/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2a13294..0e2d1e8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -2451,10 +2450,9 @@ public class WindowOperatorTest extends TestLogger {
 		}
 
 		@Override
-		public TriggerResult onMerge(TimeWindow window,
+		public void onMerge(TimeWindow window,
 									 OnMergeContext ctx) {
 			ctx.registerEventTimeTimer(window.maxTimestamp());
-			return TriggerResult.CONTINUE;
 		}
 
 		@Override