You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/21 10:17:19 UTC

[2/6] flink git commit: [FLINK-3714] Add Support for "Allowed Lateness"

[FLINK-3714] Add Support for "Allowed Lateness"

Handle late elements and take care
of cleaning the window state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34a8b03d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34a8b03d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34a8b03d

Branch: refs/heads/master
Commit: 34a8b03d2ad40db7dc00fa47923b96374c289838
Parents: f2e9c52
Author: kl0u <kk...@gmail.com>
Authored: Tue May 31 17:13:58 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 21 12:14:06 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  59 +-
 .../api/datastream/WindowedStream.java          |  53 +-
 .../assigners/EventTimeSessionWindows.java      |   5 +
 .../api/windowing/assigners/GlobalWindows.java  |   5 +
 .../assigners/ProcessingTimeSessionWindows.java |   5 +
 .../assigners/SlidingEventTimeWindows.java      |   5 +
 .../assigners/SlidingProcessingTimeWindows.java |   5 +
 .../assigners/TumblingEventTimeWindows.java     |   5 +
 .../TumblingProcessingTimeWindows.java          |   5 +
 .../api/windowing/assigners/WindowAssigner.java |   6 +
 .../api/windowing/evictors/CountEvictor.java    |   2 +-
 .../windowing/triggers/EventTimeTrigger.java    |   9 +-
 .../windowing/EvictingWindowOperator.java       | 218 +++--
 .../operators/windowing/WindowOperator.java     | 327 ++++---
 .../windowing/EvictingWindowOperatorTest.java   |  75 +-
 .../operators/windowing/WindowOperatorTest.java | 873 ++++++++++++++++++-
 .../streaming/api/scala/AllWindowedStream.scala |  17 +-
 .../streaming/api/scala/WindowedStream.scala    |  15 +
 18 files changed, 1480 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 5a1b56d..e5dacc0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
@@ -87,6 +88,8 @@ public class AllWindowedStream<T, W extends Window> {
 	/** The evictor that is used for evicting elements before window evaluation. */
 	private Evictor<? super T, ? super W> evictor;
 
+	/** The user-specified allowed lateness. */
+	private long allowedLateness = Long.MAX_VALUE;
 
 	@PublicEvolving
 	public AllWindowedStream(DataStream<T> input,
@@ -110,6 +113,26 @@ public class AllWindowedStream<T, W extends Window> {
 	}
 
 	/**
+	 * Sets the allowed lateness to a user-specified value.
+	 * If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
+	 * Setting the allowed lateness is only valid for event-time windows.
+	 * If a value different than 0 is provided with a processing-time
+	 * {@link WindowAssigner}, then an exception is thrown.
+	 */
+	@PublicEvolving
+	public AllWindowedStream<T, W> allowedLateness(Time lateness) {
+		long millis = lateness.toMilliseconds();
+		if (allowedLateness < 0) {
+			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
+		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
+			throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
+		} else {
+			this.allowedLateness = millis;
+		}
+		return this;
+	}
+
+	/**
 	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
 	 *
 	 * <p>
@@ -251,14 +274,16 @@ public class AllWindowedStream<T, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-			operator = new EvictingWindowOperator<>(windowAssigner,
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableAllWindowFunction<>(function),
 					trigger,
-					evictor);
+					evictor,
+					allowedLateness);
 
 		} else {
 			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -266,13 +291,15 @@ public class AllWindowedStream<T, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 
-			operator = new WindowOperator<>(windowAssigner,
+			operator =
+				new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableAllWindowFunction<>(function),
-					trigger);
+					trigger,
+					allowedLateness);
 		}
 
 		return input.transform(opName, resultType, operator).setParallelism(1);
@@ -335,14 +362,16 @@ public class AllWindowedStream<T, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-			operator = new EvictingWindowOperator<>(windowAssigner,
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
 					trigger,
-					evictor);
+					evictor,
+					allowedLateness);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -351,13 +380,15 @@ public class AllWindowedStream<T, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 
-			operator = new WindowOperator<>(windowAssigner,
+			operator =
+				new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalSingleValueAllWindowFunction<>(function),
-					trigger);
+					trigger,
+					allowedLateness);
 		}
 
 		return input.transform(opName, resultType, operator).setParallelism(1);
@@ -425,14 +456,16 @@ public class AllWindowedStream<T, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-			operator = new EvictingWindowOperator<>(windowAssigner,
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)),
 					trigger,
-					evictor);
+					evictor,
+					allowedLateness);
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -442,13 +475,15 @@ public class AllWindowedStream<T, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 
-			operator = new WindowOperator<>(windowAssigner,
+			operator =
+				new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalSingleValueAllWindowFunction<>(function),
-					trigger);
+					trigger,
+					allowedLateness);
 		}
 
 		return input.transform(opName, resultType, operator).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 84290b2..6110480 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -98,6 +99,8 @@ public class WindowedStream<T, K, W extends Window> {
 	/** The evictor that is used for evicting elements before window evaluation. */
 	private Evictor<? super T, ? super W> evictor;
 
+	/** The user-specified allowed lateness. */
+	private long allowedLateness = Long.MAX_VALUE;
 
 	@PublicEvolving
 	public WindowedStream(KeyedStream<T, K> input,
@@ -121,6 +124,26 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	/**
+	 * Sets the allowed lateness to a user-specified value.
+	 * If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
+	 * Setting the allowed lateness is only valid for event-time windows.
+	 * If a value different than 0 is provided with a processing-time
+	 * {@link WindowAssigner}, then an exception is thrown.
+	 */
+	@PublicEvolving
+	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
+		long millis = lateness.toMilliseconds();
+		if (allowedLateness < 0) {
+			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
+		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
+			throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
+		} else {
+			this.allowedLateness = millis;
+		}
+		return this;
+	}
+
+	/**
 	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
 	 *
 	 * <p>
@@ -272,14 +295,16 @@ public class WindowedStream<T, K, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-			operator = new EvictingWindowOperator<>(windowAssigner,
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableWindowFunction<>(function),
 					trigger,
-					evictor);
+					evictor,
+					allowedLateness);
 
 		} else {
 			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -287,13 +312,15 @@ public class WindowedStream<T, K, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 
-			operator = new WindowOperator<>(windowAssigner,
+			operator =
+				new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableWindowFunction<>(function),
-					trigger);
+					trigger,
+					allowedLateness);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -356,14 +383,16 @@ public class WindowedStream<T, K, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
 
-			operator = new EvictingWindowOperator<>(windowAssigner,
+			operator =
+				new EvictingWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
 					trigger,
-					evictor);
+					evictor,
+					allowedLateness);
 
 		} else {
 			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -372,13 +401,15 @@ public class WindowedStream<T, K, W extends Window> {
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 
-			operator = new WindowOperator<>(windowAssigner,
+			operator =
+				new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					stateDesc,
 					new InternalSingleValueWindowFunction<>(function),
-					trigger);
+					trigger,
+					allowedLateness);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -453,7 +484,8 @@ public class WindowedStream<T, K, W extends Window> {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
 				trigger,
-				evictor);
+				evictor,
+				allowedLateness);
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -469,7 +501,8 @@ public class WindowedStream<T, K, W extends Window> {
 				input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 				stateDesc,
 				new InternalSingleValueWindowFunction<>(function),
-				trigger);
+				trigger,
+				allowedLateness);
 		}
 
 		return input.transform(opName, resultType, operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index ed5add5..64c14cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -81,6 +81,11 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
 		return new TimeWindow.Serializer();
 	}
 
+	@Override
+	public boolean isEventTime() {
+		return true;
+	}
+
 	/**
 	 * Merge overlapping {@link TimeWindow}s.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index dcf440c..71101f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -102,4 +102,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 	public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new GlobalWindow.Serializer();
 	}
+
+	@Override
+	public boolean isEventTime() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index 608ebbc..0e1682d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -81,6 +81,11 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
 		return new TimeWindow.Serializer();
 	}
 
+	@Override
+	public boolean isEventTime() {
+		return false;
+	}
+
 	/**
 	 * Merge overlapping {@link TimeWindow}s.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 89b216e..83511df 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -109,4 +109,9 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();
 	}
+
+	@Override
+	public boolean isEventTime() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 4b91986..d2b0707 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -101,4 +101,9 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();
 	}
+
+	@Override
+	public boolean isEventTime() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 1f61281..70432a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -95,4 +95,9 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();
 	}
+
+	@Override
+	public boolean isEventTime() {
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 436a9ed..3ec55d0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -86,4 +86,9 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();
 	}
+
+	@Override
+	public boolean isEventTime() {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 683ed42..0b49bce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -63,4 +63,10 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
 	 * this {@code WindowAssigner}.
 	 */
 	public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
+
+	/**
+	 * Returns {@code true} if elements are assigned to windows based on event time,
+	 * {@code false} otherwise.
+	 */
+	public abstract boolean isEventTime();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index e989cbc..dc82521 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
- * An {@link Evictor} that keeps only a certain amount of elements.
+ * An {@link Evictor} that keeps up to a certain amount of elements.
  *
  * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 75c6a9d..a87e436 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -36,12 +36,17 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
 		ctx.registerEventTimeTimer(window.maxTimestamp());
-		return TriggerResult.CONTINUE;
+
+		return (window.maxTimestamp() <= ctx.getCurrentWatermark()) ?
+			TriggerResult.FIRE_AND_PURGE :
+			TriggerResult.CONTINUE;
 	}
 
 	@Override
 	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
-		return TriggerResult.FIRE_AND_PURGE;
+		return time == window.maxTimestamp() ?
+			TriggerResult.FIRE_AND_PURGE :
+			TriggerResult.CONTINUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 84ee0b9..fa1c894 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
@@ -70,8 +71,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 		StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
 		InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
 		Trigger<? super IN, ? super W> trigger,
-		Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger);
+		Evictor<? super IN, ? super W> evictor,
+		long allowedLateness) {
+
+		super(windowAssigner, windowSerializer, keySelector,
+			keySerializer, null, windowFunction, trigger, allowedLateness);
 		this.evictor = requireNonNull(evictor);
 		this.windowStateDescriptor = windowStateDescriptor;
 	}
@@ -79,8 +83,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@Override
 	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
-				element.getTimestamp());
+
+		Collection<W> elementWindows = windowAssigner.assignWindows(
+			element.getValue(),
+			element.getTimestamp());
 
 		final K key = (K) getStateBackend().getCurrentKey();
 
@@ -112,20 +118,27 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 								for (W m : mergedWindows) {
 									context.window = m;
 									context.clear();
+									deleteCleanupTimer(m);
 								}
 
 								// merge the merged state windows into the newly resulting state window
-								getStateBackend().mergePartitionedStates(stateWindowResult,
-										mergedStateWindows,
-										windowSerializer,
-										(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+								getStateBackend().mergePartitionedStates(
+									stateWindowResult,
+									mergedStateWindows,
+									windowSerializer,
+									(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
 							}
 						});
 
+				// check if the window is already inactive
+				if (isLate(actualWindow)) {
+					LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
+					continue;
+				}
+
 				W stateWindow = mergingWindows.getStateWindow(actualWindow);
-				ListState<StreamRecord<IN>> windowState = getPartitionedState(stateWindow,
-						windowSerializer,
-						windowStateDescriptor);
+				ListState<StreamRecord<IN>> windowState = getPartitionedState(
+					stateWindow, windowSerializer, windowStateDescriptor);
 				windowState.add(element);
 
 				context.key = key;
@@ -134,77 +147,166 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 				// we might have already fired because of a merge but still call onElement
 				// on the (possibly merged) window
 				TriggerResult triggerResult = context.onElement(element);
-
-				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
-						mergeTriggerResult.f0);
-
-				processTriggerResult(combinedTriggerResult, actualWindow);
+				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
+				fireOrContinue(combinedTriggerResult, actualWindow, windowState);
+
+				if (combinedTriggerResult.isPurge()) {
+					cleanup(actualWindow, windowState, mergingWindows);
+				} else {
+					registerCleanupTimer(actualWindow);
+				}
 			}
 
 		} else {
 			for (W window : elementWindows) {
 
-				ListState<StreamRecord<IN>> windowState = getPartitionedState(window,
-						windowSerializer,
-						windowStateDescriptor);
+				// check if the window is already inactive
+				if (isLate(window)) {
+					LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
+					continue;
+				}
 
+				ListState<StreamRecord<IN>> windowState = getPartitionedState(
+					window, windowSerializer, windowStateDescriptor);
 				windowState.add(element);
 
 				context.key = key;
 				context.window = window;
+
 				TriggerResult triggerResult = context.onElement(element);
+				fireOrContinue(triggerResult, window, windowState);
 
-				processTriggerResult(triggerResult, window);
+				if (triggerResult.isPurge()) {
+					cleanup(window, windowState, null);
+				} else {
+					registerCleanupTimer(window);
+				}
 			}
 		}
 	}
 
 	@Override
-	@SuppressWarnings("unchecked,rawtypes")
-	protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
-		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
-			// do nothing
-			return;
-		}
+	public void processWatermark(Watermark mark) throws Exception {
+		boolean fire;
+		do {
+			Timer<K, W> timer = watermarkTimersQueue.peek();
+			if (timer != null && timer.timestamp <= mark.getTimestamp()) {
+				fire = true;
+
+				watermarkTimers.remove(timer);
+				watermarkTimersQueue.remove();
+
+				context.key = timer.key;
+				context.window = timer.window;
+				setKeyContext(timer.key);
+
+				ListState<StreamRecord<IN>> windowState;
+				MergingWindowSet<W> mergingWindows = null;
+
+				if (windowAssigner instanceof MergingWindowAssigner) {
+					mergingWindows = getMergingWindowSet();
+					W stateWindow = mergingWindows.getStateWindow(context.window);
+					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				} else {
+					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+				}
+
+				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
+				fireOrContinue(triggerResult, context.window, windowState);
+
+				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+					cleanup(timer.window, windowState, mergingWindows);
+				}
+
+			} else {
+				fire = false;
+			}
+		} while (fire);
 
-		ListState<StreamRecord<IN>> windowState;
+		output.emitWatermark(mark);
 
-		MergingWindowSet<W> mergingWindows = null;
+		this.currentWatermark = mark.getTimestamp();
+	}
 
-		if (windowAssigner instanceof MergingWindowAssigner) {
-			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(window);
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+	@Override
+	public void trigger(long time) throws Exception {
+		boolean fire;
 
-		} else {
-			windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
-		}
+		//Remove information about the triggering task
+		processingTimeTimerFutures.remove(time);
+		processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));
 
-		if (triggerResult.isFire()) {
-			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-			Iterable<StreamRecord<IN>> contents = windowState.get();
-
-			// Work around type system restrictions...
-			int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
-
-			FluentIterable<IN> projectedContents = FluentIterable
-					.from(contents)
-					.skip(toEvict)
-					.transform(new Function<StreamRecord<IN>, IN>() {
-						@Override
-						public IN apply(StreamRecord<IN> input) {
-							return input.getValue();
-						}
-					});
-			userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
-		}
-		if (triggerResult.isPurge()) {
-			windowState.clear();
-			if (mergingWindows != null) {
-				mergingWindows.retireWindow(window);
+		do {
+			Timer<K, W> timer = processingTimeTimersQueue.peek();
+			if (timer != null && timer.timestamp <= time) {
+				fire = true;
+
+				processingTimeTimers.remove(timer);
+				processingTimeTimersQueue.remove();
+
+				context.key = timer.key;
+				context.window = timer.window;
+				setKeyContext(timer.key);
+
+				ListState<StreamRecord<IN>> windowState;
+				MergingWindowSet<W> mergingWindows = null;
+
+				if (windowAssigner instanceof MergingWindowAssigner) {
+					mergingWindows = getMergingWindowSet();
+					W stateWindow = mergingWindows.getStateWindow(context.window);
+					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				} else {
+					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+				}
+
+				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
+				fireOrContinue(triggerResult, context.window, windowState);
+
+				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+					cleanup(timer.window, windowState, mergingWindows);
+				}
+
+			} else {
+				fire = false;
 			}
-			context.clear();
+		} while (fire);
+	}
+
+	private void fireOrContinue(TriggerResult triggerResult,
+								W window,
+								ListState<StreamRecord<IN>> windowState) throws Exception {
+		if (!triggerResult.isFire()) {
+			return;
+		}
+
+		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+		Iterable<StreamRecord<IN>> contents = windowState.get();
+
+		// Work around type system restrictions...
+		int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
+
+		FluentIterable<IN> projectedContents = FluentIterable
+			.from(contents)
+			.skip(toEvict)
+			.transform(new Function<StreamRecord<IN>, IN>() {
+				@Override
+				public IN apply(StreamRecord<IN> input) {
+					return input.getValue();
+				}
+			});
+		userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+	}
+
+	private void cleanup(W window,
+						ListState<StreamRecord<IN>> windowState,
+						MergingWindowSet<W> mergingWindows) throws Exception {
+
+		windowState.clear();
+		if (mergingWindows != null) {
+			mergingWindows.retireWindow(window);
 		}
+		context.clear();
+		deleteCleanupTimer(window);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 18020b3..95ad1b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -78,9 +79,9 @@ import static java.util.Objects.requireNonNull;
  * {@link Trigger}.
  *
  * <p>
- * When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
+ * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
  * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
- * is put into panes. A pane is the bucket of elements that have the same key and same
+ * is put into panes. A pane is the bucket of elements that have the same key and belong to the same
  * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
  * {@code WindowAssigner}.
  *
@@ -130,6 +131,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	protected final TypeSerializer<W> windowSerializer;
 
+	/**
+	 * The allowed lateness for elements. This is used for:
+	 * <ul>
+	 *     <li>Deciding if an element should be dropped from a window due to lateness.
+	 *     <li>Clearing the state of a window if the system time passes the
+	 *         {@code window.maxTimestamp + allowedLateness} landmark.
+	 * </ul>
+	 */
+	protected final long allowedLateness;
+
 	// ------------------------------------------------------------------------
 	// State that is not checkpointed
 	// ------------------------------------------------------------------------
@@ -177,7 +188,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		TypeSerializer<K> keySerializer,
 		StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
 		InternalWindowFunction<ACC, OUT, K, W> windowFunction,
-		Trigger<? super IN, ? super W> trigger) {
+		Trigger<? super IN, ? super W> trigger,
+		long allowedLateness) {
 
 		super(windowFunction);
 
@@ -189,6 +201,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		this.windowStateDescriptor = windowStateDescriptor;
 		this.trigger = requireNonNull(trigger);
 
+		Preconditions.checkArgument(allowedLateness >= 0);
+		this.allowedLateness = allowedLateness;
+
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
@@ -264,7 +279,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	@Override
 	@SuppressWarnings("unchecked")
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+		Collection<W> elementWindows = windowAssigner.assignWindows(
+			element.getValue(),
+			element.getTimestamp());
 
 		final K key = (K) getStateBackend().getCurrentKey();
 
@@ -276,7 +294,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				// element because we always eagerly merge
 				final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
 
-
 				// adding the new window might result in a merge, in that case the actualWindow
 				// is the merged window and we work with that. If we don't merge then
 				// actualWindow == window
@@ -294,18 +311,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 						for (W m: mergedWindows) {
 							context.window = m;
 							context.clear();
+							deleteCleanupTimer(m);
 						}
 
 						// merge the merged state windows into the newly resulting state window
-						getStateBackend().mergePartitionedStates(stateWindowResult,
-								mergedStateWindows,
-								windowSerializer,
-								(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
+						getStateBackend().mergePartitionedStates(
+							stateWindowResult,
+							mergedStateWindows,
+							windowSerializer,
+							(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
 					}
 				});
 
+				// check if the window is already inactive
+				if (isLate(actualWindow)) {
+					LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness.");
+					continue;
+				}
+
 				W stateWindow = mergingWindows.getStateWindow(actualWindow);
-				AppendingState<IN, ACC> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				AppendingState<IN, ACC> windowState = getPartitionedState(
+					stateWindow, windowSerializer, windowStateDescriptor);
 				windowState.add(element.getValue());
 
 				context.key = key;
@@ -314,104 +340,46 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				// we might have already fired because of a merge but still call onElement
 				// on the (possibly merged) window
 				TriggerResult triggerResult = context.onElement(element);
-
 				TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
+				fireOrContinue(combinedTriggerResult, actualWindow, windowState);
 
-				processTriggerResult(combinedTriggerResult, actualWindow);
+				if (combinedTriggerResult.isPurge()) {
+					cleanup(actualWindow, windowState, mergingWindows);
+				} else {
+					registerCleanupTimer(actualWindow);
+				}
 			}
-
 		} else {
 			for (W window: elementWindows) {
 
-				AppendingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
-						windowStateDescriptor);
+				// check if the window is already inactive
+				if (isLate(window)) {
+					LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
+					continue;
+				}
 
+				AppendingState<IN, ACC> windowState = getPartitionedState(
+					window, windowSerializer, windowStateDescriptor);
 				windowState.add(element.getValue());
 
 				context.key = key;
 				context.window = window;
-				TriggerResult triggerResult = context.onElement(element);
-
-				processTriggerResult(triggerResult, window);
-			}
-		}
-	}
-
-	/**
-	 * Retrieves the {@link MergingWindowSet} for the currently active key. The caller must
-	 * ensure that the correct key is set in the state backend.
-	 */
-	@SuppressWarnings("unchecked")
-	protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
-		MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
-		if (mergingWindows == null) {
-			// try to retrieve from state
-
-			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
-			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
-			ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
-
-			mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
-			mergeState.clear();
-
-			mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
-		}
-		return mergingWindows;
-	}
-
-
-	/**
-	 * Process {@link TriggerResult} for the currently active key and the given window. The caller
-	 * must ensure that the correct key is set in the state backend and the context object.
-	 */
-	@SuppressWarnings("unchecked")
-	protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
-		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
-			// do nothing
-			return;
-		}
-
-		AppendingState<IN, ACC> windowState;
-
-		MergingWindowSet<W> mergingWindows = null;
-
-		if (windowAssigner instanceof MergingWindowAssigner) {
-			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(window);
-			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-
-		} else {
-			windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
-		}
-
-		if (triggerResult.isFire()) {
-			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-			ACC contents = windowState.get();
 
-			userFunction.apply(context.key, context.window, contents, timestampedCollector);
+				TriggerResult triggerResult = context.onElement(element);
+				fireOrContinue(triggerResult, window, windowState);
 
-		}
-		if (triggerResult.isPurge()) {
-			windowState.clear();
-			if (mergingWindows != null) {
-				mergingWindows.retireWindow(window);
+				if (triggerResult.isPurge()) {
+					cleanup(window, windowState, null);
+				} else {
+					registerCleanupTimer(window);
+				}
 			}
-			context.clear();
 		}
 	}
 
 	@Override
-	public final void processWatermark(Watermark mark) throws Exception {
-		processTriggersFor(mark);
-
-		output.emitWatermark(mark);
-
-		this.currentWatermark = mark.getTimestamp();
-	}
-
-	private void processTriggersFor(Watermark mark) throws Exception {
+	public void processWatermark(Watermark mark) throws Exception {
 		boolean fire;
-
 		do {
 			Timer<K, W> timer = watermarkTimersQueue.peek();
 			if (timer != null && timer.timestamp <= mark.getTimestamp()) {
@@ -423,16 +391,37 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.key = timer.key;
 				context.window = timer.window;
 				setKeyContext(timer.key);
+
+				AppendingState<IN, ACC> windowState;
+				MergingWindowSet<W> mergingWindows = null;
+
+				if (windowAssigner instanceof MergingWindowAssigner) {
+					mergingWindows = getMergingWindowSet();
+					W stateWindow = mergingWindows.getStateWindow(context.window);
+					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				} else {
+					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+				}
+
 				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
-				processTriggerResult(triggerResult, context.window);
+				fireOrContinue(triggerResult, context.window, windowState);
+
+				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+					cleanup(timer.window, windowState, mergingWindows);
+				}
+
 			} else {
 				fire = false;
 			}
 		} while (fire);
+
+		output.emitWatermark(mark);
+
+		this.currentWatermark = mark.getTimestamp();
 	}
 
 	@Override
-	public final void trigger(long time) throws Exception {
+	public void trigger(long time) throws Exception {
 		boolean fire;
 
 		//Remove information about the triggering task
@@ -450,17 +439,154 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				context.key = timer.key;
 				context.window = timer.window;
 				setKeyContext(timer.key);
+
+				AppendingState<IN, ACC> windowState;
+				MergingWindowSet<W> mergingWindows = null;
+
+				if (windowAssigner instanceof MergingWindowAssigner) {
+					mergingWindows = getMergingWindowSet();
+					W stateWindow = mergingWindows.getStateWindow(context.window);
+					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+				} else {
+					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+				}
+
 				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
-				processTriggerResult(triggerResult, context.window);
+				fireOrContinue(triggerResult, context.window, windowState);
+
+				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
+					cleanup(timer.window, windowState, mergingWindows);
+				}
+
 			} else {
 				fire = false;
 			}
 		} while (fire);
+	}
+
+	/**
+	 * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
+	 * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
+	 * correct key is set in the state backend and the context object.
+	 */
+	private void cleanup(W window,
+						AppendingState<IN, ACC> windowState,
+						MergingWindowSet<W> mergingWindows) throws Exception {
+		windowState.clear();
+		if (mergingWindows != null) {
+			mergingWindows.retireWindow(window);
+		}
+		context.clear();
+		deleteCleanupTimer(window);
+	}
+
+	/**
+	 * Triggers the window computation if the provided {@link TriggerResult} requires so.
+	 * The caller must ensure that the correct key is set in the state backend and the context object.
+	 */
+	@SuppressWarnings("unchecked")
+	private void fireOrContinue(TriggerResult triggerResult,
+								W window,
+								AppendingState<IN, ACC> windowState) throws Exception {
+		if (!triggerResult.isFire()) {
+			return;
+		}
+
+		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
+		ACC contents = windowState.get();
+		userFunction.apply(context.key, context.window, contents, timestampedCollector);
+	}
+
+	/**
+	 * Retrieves the {@link MergingWindowSet} for the currently active key.
+	 * The caller must ensure that the correct key is set in the state backend.
+	 */
+	@SuppressWarnings("unchecked")
+	protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
+		MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
+		if (mergingWindows == null) {
+			// try to retrieve from state
+
+			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
+			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+			ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
+
+			mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
+			mergeState.clear();
+
+			mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
+		}
+		return mergingWindows;
+	}
 
-		// Also check any watermark timers. We might have some in here since
-		// Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered
-		// that is already behind the watermark.
-		processTriggersFor(new Watermark(currentWatermark));
+	/**
+	 * Decides if a window is currently late or not, based on the current
+	 * watermark, i.e. the current event time, and the allowed lateness.
+	 * @param window
+	 * 					The collection of windows returned by the {@link WindowAssigner}.
+	 * @return The windows (among the {@code eligibleWindows}) for which the element should still be
+	 * 					considered when triggering.
+	 */
+	protected boolean isLate(W window) {
+		return (windowAssigner.isEventTime() && (getCleanupTimeForWindow(window) <= currentWatermark));
+	}
+
+	/**
+	 * Registers a timer to cleanup the content of the window.
+	 * @param window
+	 * 					the window whose state to discard
+	 */
+	protected void registerCleanupTimer(W window) {
+		long cleanupTime = getCleanupTimeForWindow(window);
+		if (windowAssigner.isEventTime()) {
+			context.registerEventTimeTimer(cleanupTime);
+		} else {
+			context.registerProcessingTimeTimer(cleanupTime);
+		}
+	}
+
+	/**
+	 * Deletes the cleanup timer set for the contents of the provided window.
+	 * @param window
+	 * 					the window whose state to discard
+	 */
+	protected void deleteCleanupTimer(W window) {
+		long cleanupTime = getCleanupTimeForWindow(window);
+		if (windowAssigner.isEventTime()) {
+			context.deleteEventTimeTimer(cleanupTime);
+		} else {
+			context.deleteProcessingTimeTimer(cleanupTime);
+		}
+	}
+
+	/**
+	 * Returns the cleanup time for a window, which is
+	 * {@code window.maxTimestamp + allowedLateness}. In
+	 * case this leads to a value greated than {@link Long#MAX_VALUE}
+	 * then a cleanup time of {@link Long#MAX_VALUE} is
+	 * returned.
+	 *
+	 * @param window the window whose cleanup time we are computing.
+	 */
+	private long getCleanupTimeForWindow(W window) {
+		long cleanupTime = window.maxTimestamp() + allowedLateness;
+		return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
+	}
+
+	/**
+	 * Decides if it is time to clean up the window state.
+	 * Clean up time for a window is:
+	 * 		<li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness
+	 * 		<li> if it is processing time, after the processing time at the node passes the end of the window.
+	 * 	@param window
+	 * 					the window to clean
+	 *  @param time
+	 *  				the current time (event or processing depending on the {@link WindowAssigner}
+	 *  @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
+	 */
+	protected final boolean isCleanupTime(W window, long time) {
+		long cleanupTime = getCleanupTimeForWindow(window);
+		return  cleanupTime == time;
 	}
 
 	/**
@@ -562,13 +688,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			if (watermarkTimers.add(timer)) {
 				watermarkTimersQueue.add(timer);
 			}
-
-			if (time <= currentWatermark) {
-				// immediately schedule a trigger, so that we don't wait for the next
-				// watermark update to fire the watermark trigger
-				getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this);
-				//No need to put it in processingTimeTimerFutures as this timer is never removed
-			}
 		}
 
 		@Override
@@ -594,7 +713,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			if (watermarkTimers.remove(timer)) {
 				watermarkTimersQueue.remove(timer);
 			}
-
 		}
 
 		public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
@@ -627,7 +745,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 	}
 
-
 	/**
 	 * Internal class for keeping track of in-flight timers.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/34a8b03d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 6af7ac4..dc71440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -30,9 +30,13 @@ import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFu
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -45,6 +49,7 @@ import org.junit.Test;
 
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class EvictingWindowOperatorTest {
@@ -73,7 +78,8 @@ public class EvictingWindowOperatorTest {
 				stateDesc,
 				new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
 				CountTrigger.of(WINDOW_SLIDE),
-				CountEvictor.of(WINDOW_SIZE));
+				CountEvictor.of(WINDOW_SIZE),
+				0);
 
 		operator.setInputType(inputType, new ExecutionConfig());
 
@@ -144,7 +150,8 @@ public class EvictingWindowOperatorTest {
 			stateDesc,
 			new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
 			CountTrigger.of(WINDOW_SLIDE),
-			CountEvictor.of(WINDOW_SIZE));
+			CountEvictor.of(WINDOW_SIZE),
+			0);
 
 		operator.setInputType(inputType, new ExecutionConfig());
 
@@ -194,7 +201,69 @@ public class EvictingWindowOperatorTest {
 		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
 	}
 
-	// ------------------------------------------------------------------------
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTumblingWindowWithApply() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+
+		final int WINDOW_SIZE = 4;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents",
+			new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+
+		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
+			TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+			new TimeWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
+			EventTimeTrigger.create(),
+			CountEvictor.of(WINDOW_SIZE),
+			0);
+
+		operator.setInputType(inputType, new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 100));
+
+		testHarness.processWatermark(new Watermark(1999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 1997));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 1998));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 2310)); // not late but more than 4
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 2310));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2310));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2310));
+
+		testHarness.processWatermark(new Watermark(3999));											 // now is the evictor
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new Watermark(1999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+		expectedOutput.add(new Watermark(3999));
+
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
+			new EvictingWindowOperatorTest.ResultSortComparator());
+		testHarness.close();
+	}
+
+		// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------