You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 17:38:48 UTC

[1/2] flink git commit: [hotfix] Improve handling of Window Trigger results

Repository: flink
Updated Branches:
  refs/heads/release-0.10 15d3f10c1 -> 856b27837


[hotfix] Improve handling of Window Trigger results

This enhances the TriggerResult enum with methods isFire() and isPurge()
that simplify the logic in WindowOperator.processTriggerResult().

Also, the operator now keeps track of the current watermark and fires
immediately if a trigger registers an event-time callback for a
timestamp that lies in the past. For this the TriggerResult now as
method merge() that allows to merge to TriggerResultS.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/856b2783
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/856b2783
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/856b2783

Branch: refs/heads/release-0.10
Commit: 856b27837df97f4341d84e8e0ccaa61de43939b5
Parents: bc5b852
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 11:31:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 17:38:24 2015 +0200

----------------------------------------------------------------------
 .../api/windowing/triggers/Trigger.java         |  45 ++++++-
 .../windowing/NonKeyedWindowOperator.java       | 107 +++++++++++-----
 .../operators/windowing/WindowOperator.java     | 127 +++++++++++++------
 .../flink/streaming/util/TestHarnessUtil.java   |  33 ++++-
 4 files changed, 236 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/856b2783/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 15ccb33..ee6a279 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -77,10 +77,51 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * <p>
 	 * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
 	 * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
-	 * are purged. On {@code CONTINUE} nothing happens, processing continues.
+	 * are purged. On {@code CONTINUE} nothing happens, processing continues. On {@code PURGE}
+	 * the contents of the window are discarded and now result is emitted for the window.
 	 */
 	enum TriggerResult {
-		CONTINUE, FIRE_AND_PURGE, FIRE
+		CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true);
+
+		private final boolean fire;
+		private final boolean purge;
+
+		TriggerResult(boolean fire, boolean purge) {
+			this.purge = purge;
+			this.fire = fire;
+		}
+
+		public boolean isFire() {
+			return fire;
+		}
+
+		public boolean isPurge() {
+			return purge;
+		}
+
+		/**
+		 * Merges two {@code TriggerResults}. This specifies what should happen if we have
+		 * two results from a Trigger, for example as a result from
+		 * {@link #onElement(Object, long, Window, TriggerContext)} and
+		 * {@link #onEventTime(long, Window, TriggerContext)}.
+		 *
+		 * <p>
+		 * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
+		 * then {@code FIRE} is the combined result;
+		 */
+		public static TriggerResult merge(TriggerResult a, TriggerResult b) {
+			if (a.purge || b.purge) {
+				if (a.fire || b.fire) {
+					return FIRE_AND_PURGE;
+				} else {
+					return PURGE;
+				}
+			} else if (a.fire || b.fire) {
+				return FIRE;
+			} else {
+				return CONTINUE;
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/856b2783/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 2209d5e..d12a930 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -122,6 +123,12 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	/**
+	 * To keep track of the current watermark so that we can immediately fire if a trigger
+	 * registers an event time callback for a timestamp that lies in the past.
+	 */
+	protected transient long currentWatermark = -1L;
+
 	// ------------------------------------------------------------------------
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
@@ -152,6 +159,11 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		currentWatermark = -1;
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -232,7 +244,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				windows.put(window, context);
 			}
 			context.windowBuffer.storeElement(element);
-			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
+			Trigger.TriggerResult triggerResult = context.onElement(element);
 			processTriggerResult(triggerResult, window);
 		}
 	}
@@ -249,53 +261,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
-		switch (triggerResult) {
-			case FIRE: {
-				Context context = windows.get(window);
-				if (context == null) {
-					LOG.debug("Window {} already gone.", window);
-					return;
-				}
-
-
-				emitWindow(context);
-				break;
-			}
-
-			case FIRE_AND_PURGE: {
-				Context context = windows.remove(window);
-				if (context == null) {
-					LOG.debug("Window {} already gone.", window);
-					return;
-				}
+		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+			// do nothing
+			return;
+		}
+		Context context;
 
-				emitWindow(context);
-				break;
-			}
+		if (triggerResult.isPurge()) {
+			context = windows.remove(window);
+		} else {
+			context = windows.get(window);
+		}
+		if (context == null) {
+			LOG.debug("Window {} already gone.", window);
+			return;
+		}
 
-			case CONTINUE:
-				// ingore
+		if (triggerResult.isFire()) {
+			emitWindow(context);
 		}
 	}
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
+		Set<Context> toTrigger = new HashSet<>();
 
+		// we cannot call the Trigger in here because trigger methods might register new triggers.
+		// that would lead to concurrent modification errors.
 		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
-					processTriggerResult(triggerResult, context.window);
+					toTrigger.add(context);
 				}
 				toRemove.add(triggers.getKey());
 			}
 		}
 
+		for (Context context: toTrigger) {
+			// double check the time. it can happen that the trigger registers a new timer,
+			// in that case the entry is left in the watermarkTimers set for performance reasons.
+			// We have to check here whether the entry in the set still reflects the
+			// currently set timer in the Context.
+			if (context.watermarkTimer <= mark.getTimestamp()) {
+				Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
+				processTriggerResult(triggerResult, context.window);
+			}
+		}
+
 		for (Long l: toRemove) {
 			watermarkTimers.remove(l);
 		}
 		output.emitWatermark(mark);
+
+		this.currentWatermark = mark.getTimestamp();
 	}
 
 	@Override
@@ -318,8 +337,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	/**
-	 * A context object that is given to {@code Trigger} functions to allow them to register
-	 * timer/watermark callbacks.
+	 * The {@code Context} is responsible for keeping track of the state of one pane.
+	 *
+	 * <p>
+	 * A pane is the bucket of elements that have the same key (assigned by the
+	 * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+	 * be in multiple panes of it was assigned to multiple windows by the
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+	 * have their own instance of the {@code Trigger}.
 	 */
 	protected class Context implements Trigger.TriggerContext {
 		protected W window;
@@ -435,8 +460,20 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			triggers.add(this);
 		}
 
+		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
+			Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+				// fire now and don't wait for the next watermark update
+				Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
+				return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+			} else {
+				return onElementResult;
+			}
+		}
+
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
+				processingTimeTimer = -1;
 				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
@@ -445,7 +482,17 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, window, this);
+				watermarkTimer = -1;
+				Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
+
+				if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+					// fire now and don't wait for the next watermark update
+					Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
+					return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+				} else {
+					return firstTriggerResult;
+				}
+
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/856b2783/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index e8e001d..c39679f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -147,6 +148,12 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	/**
+	 * To keep track of the current watermark so that we can immediately fire if a trigger
+	 * registers an event time callback for a timestamp that lies in the past.
+	 */
+	protected transient long currentWatermark = -1L;
+
 	// ------------------------------------------------------------------------
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
@@ -181,6 +188,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		currentWatermark = -1;
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -276,8 +288,9 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 				context = new Context(key, window, windowBuffer);
 				keyWindows.put(window, context);
 			}
+
 			context.windowBuffer.storeElement(element);
-			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
+			Trigger.TriggerResult triggerResult = context.onElement(element);
 			processTriggerResult(triggerResult, key, window);
 		}
 	}
@@ -297,66 +310,68 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
-		switch (triggerResult) {
-			case FIRE: {
-				Map<W, Context> keyWindows = windows.get(key);
-				if (keyWindows == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				Context context = keyWindows.get(window);
-				if (context == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-
-
-				emitWindow(context);
-				break;
-			}
-
-			case FIRE_AND_PURGE: {
-				Map<W, Context> keyWindows = windows.get(key);
-				if (keyWindows == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				Context context = keyWindows.remove(window);
-				if (context == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				if (keyWindows.isEmpty()) {
-					windows.remove(key);
-				}
+		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+			// do nothing
+			return;
+		}
+		Context context;
+		Map<W, Context> keyWindows = windows.get(key);
+		if (keyWindows == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
 
-				emitWindow(context);
-				break;
+		if (triggerResult.isPurge()) {
+			context = keyWindows.remove(window);
+			if (keyWindows.isEmpty()) {
+				windows.remove(key);
 			}
+		} else {
+			context = keyWindows.get(window);
+		}
+		if (context == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
 
-			case CONTINUE:
-				// ingore
+		if (triggerResult.isFire()) {
+			emitWindow(context);
 		}
 	}
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
+		Set<Context> toTrigger = new HashSet<>();
 
+		// we cannot call the Trigger in here because trigger methods might register new triggers.
+		// that would lead to concurrent modification errors.
 		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
-					processTriggerResult(triggerResult, context.key, context.window);
+					toTrigger.add(context);
 				}
 				toRemove.add(triggers.getKey());
 			}
 		}
 
+		for (Context context: toTrigger) {
+			// double check the time. it can happen that the trigger registers a new timer,
+			// in that case the entry is left in the watermarkTimers set for performance reasons.
+			// We have to check here whether the entry in the set still reflects the
+			// currently set timer in the Context.
+			if (context.watermarkTimer <= mark.getTimestamp()) {
+				Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
+				processTriggerResult(triggerResult, context.key, context.window);
+			}
+		}
+
 		for (Long l: toRemove) {
 			watermarkTimers.remove(l);
 		}
 		output.emitWatermark(mark);
+
+		this.currentWatermark = mark.getTimestamp();
 	}
 
 	@Override
@@ -380,8 +395,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	/**
-	 * A context object that is given to {@code Trigger} functions to allow them to register
-	 * timer/watermark callbacks.
+	 * The {@code Context} is responsible for keeping track of the state of one pane.
+	 *
+	 * <p>
+	 * A pane is the bucket of elements that have the same key (assigned by the
+	 * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+	 * be in multiple panes of it was assigned to multiple windows by the
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+	 * have their own instance of the {@code Trigger}.
 	 */
 	protected class Context implements Trigger.TriggerContext {
 		protected K key;
@@ -508,8 +529,20 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 			triggers.add(this);
 		}
 
+		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
+			Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+				// fire now and don't wait for the next watermark update
+				Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
+				return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+			} else {
+				return onElementResult;
+			}
+		}
+
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
+				processingTimeTimer = -1;
 				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
@@ -518,7 +551,17 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, window, this);
+				watermarkTimer = -1;
+				Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
+
+				if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+					// fire now and don't wait for the next watermark update
+					Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
+					return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+				} else {
+					return firstTriggerResult;
+				}
+
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/856b2783/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 889ae37..9d1e674 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -17,11 +17,14 @@
  */
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -72,8 +75,34 @@ public class TestHarnessUtil {
 	 * Compare the two queues containing operator/task output by converting them to an array first.
 	 */
 	public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
-		Object[] sortedExpected = expected.toArray();
-		Object[] sortedActual = actual.toArray();
+		// first, compare only watermarks, their position should be deterministic
+		Iterator<Object> exIt = expected.iterator();
+		Iterator<Object> actIt = actual.iterator();
+		while (exIt.hasNext()) {
+			Object nextEx = exIt.next();
+			Object nextAct = actIt.next();
+			if (nextEx instanceof Watermark) {
+				Assert.assertEquals(nextEx, nextAct);
+			}
+		}
+
+		List<Object> expectedRecords = new ArrayList<>();
+		List<Object> actualRecords = new ArrayList<>();
+
+		for (Object ex: expected) {
+			if (ex instanceof StreamRecord) {
+				expectedRecords.add(ex);
+			}
+		}
+
+		for (Object act: actual) {
+			if (act instanceof StreamRecord) {
+				actualRecords.add(act);
+			}
+		}
+
+		Object[] sortedExpected = expectedRecords.toArray();
+		Object[] sortedActual = actualRecords.toArray();
 
 		Arrays.sort(sortedExpected, comparator);
 		Arrays.sort(sortedActual, comparator);


[2/2] flink git commit: [hotfix] Add Window Parameter in Trigger.onEventTime/onProcessingTime

Posted by al...@apache.org.
[hotfix] Add Window Parameter in Trigger.onEventTime/onProcessingTime

Before these trigger methods had no information about the window that
they are responsible for. This information might be required for
implementing more advanced trigger behaviour.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc5b852a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc5b852a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc5b852a

Branch: refs/heads/release-0.10
Commit: bc5b852a29bd76a53975f40909e5259e034c9980
Parents: 15d3f10
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 11:28:58 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 17:38:24 2015 +0200

----------------------------------------------------------------------
 .../streaming/examples/windowing/SessionWindowing.java  |  5 ++---
 .../api/windowing/assigners/GlobalWindows.java          |  4 ++--
 .../windowing/triggers/ContinuousEventTimeTrigger.java  |  5 ++---
 .../triggers/ContinuousProcessingTimeTrigger.java       |  5 ++---
 .../streaming/api/windowing/triggers/CountTrigger.java  |  5 ++---
 .../streaming/api/windowing/triggers/DeltaTrigger.java  |  5 ++---
 .../api/windowing/triggers/EventTimeTrigger.java        | 12 +++++++-----
 .../api/windowing/triggers/ProcessingTimeTrigger.java   |  5 ++---
 .../api/windowing/triggers/PurgingTrigger.java          |  8 ++++----
 .../flink/streaming/api/windowing/triggers/Trigger.java | 12 ++++++------
 .../operators/windowing/NonKeyedWindowOperator.java     |  4 ++--
 .../runtime/operators/windowing/WindowOperator.java     |  4 ++--
 .../apache/flink/streaming/util/TestHarnessUtil.java    |  1 -
 13 files changed, 35 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 3c63156..035727a 100644
--- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -126,7 +126,7 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
+		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
 			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
 			Long lastSeen = lastSeenState.value();
 
@@ -137,8 +137,7 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onProcessingTime(long time,
-				TriggerContext ctx) throws Exception {
+		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
 			return TriggerResult.CONTINUE;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 4d5b9d7..99a4962 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -79,12 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 		}
 
 		@Override
-		public TriggerResult onEventTime(long time, TriggerContext ctx) {
+		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
 
 		@Override
-		public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
+		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index ea26309..4b6af8f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -57,14 +57,13 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
 		ctx.registerEventTimeTimer(time + interval);
 		return TriggerResult.FIRE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index be56738..66f9bda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -63,13 +63,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 
 		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
 		long nextFireTimestamp = fireState.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 8512989..efb62d7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -49,13 +49,12 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 1c6523d..d791d28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -59,13 +59,12 @@ public class DeltaTrigger<T extends Serializable, W extends Window> implements T
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 4b6613c..831e360 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -37,13 +37,12 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
 		return TriggerResult.FIRE_AND_PURGE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 
@@ -53,10 +52,13 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	/**
-	 * Creates trigger that fires once the watermark passes the end of the window.
+	 * Creates an event-time trigger that fires once the watermark passes the end of the window.
+	 *
+	 * <p>
+	 * Once the trigger fires all elements are discarded. Elements that arrive late immediately
+	 * trigger window evaluation with just this one element.
 	 */
 	public static EventTimeTrigger create() {
 		return new EventTimeTrigger();
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 6278ba6..b460c8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -35,13 +35,12 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time,
-			TriggerContext ctx) throws Exception {
+	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
+	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
 		return TriggerResult.FIRE_AND_PURGE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index eaca336..cc20296 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -53,8 +53,8 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx);
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
 		switch (triggerResult) {
 			case FIRE:
 				return TriggerResult.FIRE_AND_PURGE;
@@ -66,8 +66,8 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx);
+	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
 		switch (triggerResult) {
 			case FIRE:
 				return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index ef8110b..15ccb33 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -60,7 +60,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception;
+	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Called when an event-time timer that was set using the trigger context fires.
@@ -68,7 +68,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception;
+	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
 
 
 	/**
@@ -91,19 +91,19 @@ public interface Trigger<T, W extends Window> extends Serializable {
 
 		/**
 		 * Register a system time callback. When the current system time passes the specified
-		 * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here.
+		 * time {@link #onProcessingTime(long, Window, TriggerContext)} is called with the time specified here.
 		 *
-		 * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)}
+		 * @param time The time at which to invoke {@link #onProcessingTime(long, Window, TriggerContext)}
 		 */
 		void registerProcessingTimeTimer(long time);
 
 		/**
 		 * Register an event-time callback. When the current watermark passes the specified
-		 * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here.
+		 * time {@link #onEventTime(long, Window, TriggerContext)} is called with the time specified here.
 		 *
 		 * @see org.apache.flink.streaming.api.watermark.Watermark
 		 *
-		 * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)}
+		 * @param time The watermark at which to invoke {@link #onEventTime(long, Window, TriggerContext)}
 		 */
 		void registerEventTimeTimer(long time);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 03e8c4c..2209d5e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -437,7 +437,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
-				return trigger.onProcessingTime(time, this);
+				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}
@@ -445,7 +445,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, this);
+				return trigger.onEventTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 30ce477..e8e001d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -510,7 +510,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
-				return trigger.onProcessingTime(time, this);
+				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}
@@ -518,7 +518,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, this);
+				return trigger.onEventTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc5b852a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 0c5cd8f..889ae37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -21,7 +21,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;