You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 17:38:49 UTC

[2/2] flink git commit: [hotfix] Add Window Parameter in Trigger.onEventTime/onProcessingTime

[hotfix] Add Window Parameter in Trigger.onEventTime/onProcessingTime

Before these trigger methods had no information about the window that
they are responsible for. This information might be required for
implementing more advanced trigger behaviour.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc5b852a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc5b852a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc5b852a

Branch: refs/heads/release-0.10
Commit: bc5b852a29bd76a53975f40909e5259e034c9980
Parents: 15d3f10
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 11:28:58 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 17:38:24 2015 +0200

----------------------------------------------------------------------
 .../streaming/examples/windowing/SessionWindowing.java  |  5 ++---
 .../api/windowing/assigners/GlobalWindows.java          |  4 ++--
 .../windowing/triggers/ContinuousEventTimeTrigger.java  |  5 ++---
 .../triggers/ContinuousProcessingTimeTrigger.java       |  5 ++---
 .../streaming/api/windowing/triggers/CountTrigger.java  |  5 ++---
 .../streaming/api/windowing/triggers/DeltaTrigger.java  |  5 ++---
 .../api/windowing/triggers/EventTimeTrigger.java        | 12 +++++++-----
 .../api/windowing/triggers/ProcessingTimeTrigger.java   |  5 ++---
 .../api/windowing/triggers/PurgingTrigger.java          |  8 ++++----
 .../flink/streaming/api/windowing/triggers/Trigger.java | 12 ++++++------
 .../operators/windowing/NonKeyedWindowOperator.java     |  4 ++--
 .../runtime/operators/windowing/WindowOperator.java     |  4 ++--
 .../apache/flink/streaming/util/TestHarnessUtil.java    |  1 -
 13 files changed, 35 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 3c63156..035727a 100644
--- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -126,7 +126,7 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
+		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
 			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
 			Long lastSeen = lastSeenState.value();
 
@@ -137,8 +137,7 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onProcessingTime(long time,
-				TriggerContext ctx) throws Exception {
+		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
 			return TriggerResult.CONTINUE;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 4d5b9d7..99a4962 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -79,12 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 		}
 
 		@Override
-		public TriggerResult onEventTime(long time, TriggerContext ctx) {
+		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
 
 		@Override
-		public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
+		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index ea26309..4b6af8f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -57,14 +57,13 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
 		ctx.registerEventTimeTimer(time + interval);
 		return TriggerResult.FIRE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index be56738..66f9bda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -63,13 +63,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 
 		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
 		long nextFireTimestamp = fireState.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 8512989..efb62d7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -49,13 +49,12 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 1c6523d..d791d28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -59,13 +59,12 @@ public class DeltaTrigger<T extends Serializable, W extends Window> implements T
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 4b6613c..831e360 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -37,13 +37,12 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
 		return TriggerResult.FIRE_AND_PURGE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 
@@ -53,10 +52,13 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	/**
-	 * Creates trigger that fires once the watermark passes the end of the window.
+	 * Creates an event-time trigger that fires once the watermark passes the end of the window.
+	 *
+	 * <p>
+	 * Once the trigger fires all elements are discarded. Elements that arrive late immediately
+	 * trigger window evaluation with just this one element.
 	 */
 	public static EventTimeTrigger create() {
 		return new EventTimeTrigger();
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 6278ba6..b460c8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -35,13 +35,12 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
+	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
 		return TriggerResult.FIRE_AND_PURGE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index eaca336..cc20296 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -53,8 +53,8 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx);
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
 		switch (triggerResult) {
 			case FIRE:
 				return TriggerResult.FIRE_AND_PURGE;
@@ -66,8 +66,8 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx);
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
 		switch (triggerResult) {
 			case FIRE:
 				return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index ef8110b..15ccb33 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -60,7 +60,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception;
+	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Called when an event-time timer that was set using the trigger context fires.
@@ -68,7 +68,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception;
+	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
 
 
 	/**
@@ -91,19 +91,19 @@ public interface Trigger<T, W extends Window> extends Serializable {
 
 		/**
 		 * Register a system time callback. When the current system time passes the specified
-		 * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here.
+		 * time {@link #onProcessingTime(long, Window, TriggerContext)} is called with the time specified here.
 		 *
-		 * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)}
+		 * @param time The time at which to invoke {@link #onProcessingTime(long, Window, TriggerContext)}
 		 */
 		void registerProcessingTimeTimer(long time);
 
 		/**
 		 * Register an event-time callback. When the current watermark passes the specified
-		 * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here.
+		 * time {@link #onEventTime(long, Window, TriggerContext)} is called with the time specified here.
 		 *
 		 * @see org.apache.flink.streaming.api.watermark.Watermark
 		 *
-		 * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)}
+		 * @param time The watermark at which to invoke {@link #onEventTime(long, Window, TriggerContext)}
 		 */
 		void registerEventTimeTimer(long time);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 03e8c4c..2209d5e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -437,7 +437,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
-				return trigger.onProcessingTime(time, this);
+				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}
@@ -445,7 +445,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, this);
+				return trigger.onEventTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 30ce477..e8e001d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -510,7 +510,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
-				return trigger.onProcessingTime(time, this);
+				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}
@@ -518,7 +518,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, this);
+				return trigger.onEventTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 0c5cd8f..889ae37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -21,7 +21,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;