You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/20 18:42:20 UTC

[1/4] flink git commit: Replace Trigger.onTime by Trigger.onProcessingTime/onEventTime

Repository: flink
Updated Branches:
  refs/heads/master e71196972 -> f760b616a


Replace Trigger.onTime by Trigger.onProcessingTime/onEventTime

This also renames WatermarkTrigger to EventTimeTrigger and
ContinuousWatermarkTrigger to ContinuousEventTimeTrigger.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f760b616
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f760b616
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f760b616

Branch: refs/heads/master
Commit: f760b616af0e1608cb4c190aeb264da72f624f4c
Parents: 4442269
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Oct 17 13:35:24 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 20 18:39:12 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 14 +--
 .../api/windowing/assigners/GlobalWindows.java  |  7 +-
 .../windowing/assigners/SlidingTimeWindows.java |  4 +-
 .../assigners/TumblingTimeWindows.java          |  4 +-
 .../triggers/ContinuousEventTimeTrigger.java    | 90 ++++++++++++++++++++
 .../ContinuousProcessingTimeTrigger.java        |  8 +-
 .../triggers/ContinuousWatermarkTrigger.java    | 84 ------------------
 .../api/windowing/triggers/CountTrigger.java    | 10 ++-
 .../api/windowing/triggers/DeltaTrigger.java    | 10 ++-
 .../windowing/triggers/EventTimeTrigger.java    | 62 ++++++++++++++
 .../triggers/ProcessingTimeTrigger.java         |  8 +-
 .../api/windowing/triggers/PurgingTrigger.java  | 17 +++-
 .../api/windowing/triggers/Trigger.java         | 25 ++++--
 .../windowing/triggers/WatermarkTrigger.java    | 56 ------------
 .../windowing/NonKeyedWindowOperator.java       |  6 +-
 .../operators/windowing/WindowOperator.java     |  6 +-
 .../windowing/AllWindowTranslationTest.java     |  8 +-
 .../windowing/EvictingWindowOperatorTest.java   |  1 -
 .../windowing/NonKeyedWindowOperatorTest.java   | 11 ++-
 .../operators/windowing/WindowOperatorTest.java | 10 +--
 .../windowing/WindowTranslationTest.java        |  8 +-
 .../examples/windowing/SessionWindowing.java    | 10 ++-
 22 files changed, 263 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 9257ae1..9fce0d7 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1008,7 +1008,7 @@ dataStream.union(otherStream1, otherStream2, ...)
     {% highlight scala %}
 dataStream.join(otherStream)
     .where(0).equalTo(1)
-    .onTimeWindow(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
     .apply { ... }
     {% endhighlight %}
           </td>
@@ -2308,7 +2308,7 @@ windowedStream.trigger(ProcessingTimeTrigger.create());
         The elements on the triggered window are henceforth discarded.
       </p>
 {% highlight java %}
-windowedStream.trigger(WatermarkTrigger.create());
+windowedStream.trigger(EventTimeTrigger.create());
 {% endhighlight %}
     </td>
   </tr>
@@ -2334,7 +2334,7 @@ windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SE
         The elements on the triggered window are retained.
       </p>
 {% highlight java %}
-windowedStream.trigger(ContinuousWatermarkTrigger.of(Time.of(5, TimeUnit.SECONDS)));
+windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));
 {% endhighlight %}
     </td>
   </tr>
@@ -2414,7 +2414,7 @@ windowedStream.trigger(ProcessingTimeTrigger.create);
         The elements on the triggered window are henceforth discarded.
       </p>
 {% highlight scala %}
-windowedStream.trigger(WatermarkTrigger.create);
+windowedStream.trigger(EventTimeTrigger.create);
 {% endhighlight %}
     </td>
   </tr>
@@ -2440,7 +2440,7 @@ windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SE
         The elements on the triggered window are retained.
       </p>
 {% highlight scala %}
-windowedStream.trigger(ContinuousWatermarkTrigger.of(Time.of(5, TimeUnit.SECONDS)));
+windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS)));
 {% endhighlight %}
     </td>
   </tr>
@@ -2653,7 +2653,7 @@ stream.timeWindow(Time.of(5, TimeUnit.SECONDS))
         <td>
     {% highlight java %}
 stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS)))
-  .trigger(WatermarkTrigger.create())
+  .trigger(EventTimeTrigger.create())
     {% endhighlight %}
         </td>
       </tr>
@@ -2667,7 +2667,7 @@ stream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))
         <td>
     {% highlight java %}
 stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
-  .trigger(WatermarkTrigger.create())
+  .trigger(EventTimeTrigger.create())
     {% endhighlight %}
         </td>
       </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 9b7c8f2..4d5b9d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -79,7 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 		}
 
 		@Override
-		public TriggerResult onTime(long time, TriggerContext ctx) {
+		public TriggerResult onEventTime(long time, TriggerContext ctx) {
+			return TriggerResult.CONTINUE;
+		}
+
+		@Override
+		public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 7b1f1f4..5f7ab45 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 import java.util.ArrayList;
@@ -81,7 +81,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 			return ProcessingTimeTrigger.create();
 		} else {
-			return WatermarkTrigger.create();
+			return EventTimeTrigger.create();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index aa019e4..463b2c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 import java.util.Collection;
@@ -67,7 +67,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 			return ProcessingTimeTrigger.create();
 		} else {
-			return WatermarkTrigger.create();
+			return EventTimeTrigger.create();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
new file mode 100644
index 0000000..ea26309
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that continuously fires based on a given time interval. This fires based
+ * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
+public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Object, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final long interval;
+
+	private ContinuousEventTimeTrigger(long interval) {
+		this.interval = interval;
+	}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
+
+		OperatorState<Boolean> first = ctx.getKeyValueState("first", true);
+
+		if (first.value()) {
+			long start = timestamp - (timestamp % interval);
+			long nextFireTimestamp = start + interval;
+
+			ctx.registerEventTimeTimer(nextFireTimestamp);
+
+			first.update(false);
+			return TriggerResult.CONTINUE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+		ctx.registerEventTimeTimer(time + interval);
+		return TriggerResult.FIRE;
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time,
+			TriggerContext ctx) throws Exception {
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public String toString() {
+		return "ContinuousProcessingTimeTrigger(" + interval + ")";
+	}
+
+	@VisibleForTesting
+	public long getInterval() {
+		return interval;
+	}
+
+	/**
+	 * Creates a trigger that continuously fires based on the given interval.
+	 *
+	 * @param interval The time interval at which to fire.
+	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+	 */
+	public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime interval) {
+		return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 3ea60f4..be56738 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -63,7 +63,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+	public TriggerResult onEventTime(long time,
+			TriggerContext ctx) throws Exception {
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
 
 		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
 		long nextFireTimestamp = fireState.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
deleted file mode 100644
index 494ba3a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A {@link Trigger} that continuously fires based on a given time interval. This fires based
- * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final long interval;
-
-	private ContinuousWatermarkTrigger(long interval) {
-		this.interval = interval;
-	}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
-
-		OperatorState<Boolean> first = ctx.getKeyValueState("first", true);
-
-		if (first.value()) {
-			long start = timestamp - (timestamp % interval);
-			long nextFireTimestamp = start + interval;
-
-			ctx.registerWatermarkTimer(nextFireTimestamp);
-
-			first.update(false);
-			return TriggerResult.CONTINUE;
-		}
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
-		ctx.registerWatermarkTimer(time + interval);
-		return TriggerResult.FIRE;
-	}
-
-	@Override
-	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + interval + ")";
-	}
-
-	@VisibleForTesting
-	public long getInterval() {
-		return interval;
-	}
-
-	/**
-	 * Creates a trigger that continuously fires based on the given interval.
-	 *
-	 * @param interval The time interval at which to fire.
-	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 */
-	public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
-		return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 57582f7..8512989 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -49,8 +49,14 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
-		return null;
+	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time,
+			TriggerContext ctx) throws Exception {
+		return TriggerResult.CONTINUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index b1283f5..1c6523d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -59,8 +59,14 @@ public class DeltaTrigger<T extends Serializable, W extends Window> implements T
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
-		return null;
+	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time,
+			TriggerContext ctx) throws Exception {
+		return TriggerResult.CONTINUE;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
new file mode 100644
index 0000000..4b6613c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+/**
+ * A {@link Trigger} that fires once the watermark passes the end of the window
+ * to which a pane belongs.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ */
+public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private EventTimeTrigger() {}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
+		ctx.registerEventTimeTimer(window.maxTimestamp());
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onEventTime(long time, TriggerContext ctx) {
+		return TriggerResult.FIRE_AND_PURGE;
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time,
+			TriggerContext ctx) throws Exception {
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public String toString() {
+		return "EventTimeTrigger()";
+	}
+
+	/**
+	 * Creates trigger that fires once the watermark passes the end of the window.
+	 */
+	public static EventTimeTrigger create() {
+		return new EventTimeTrigger();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 70c57ef..6278ba6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -35,7 +35,13 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
+	public TriggerResult onEventTime(long time,
+			TriggerContext ctx) throws Exception {
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
 		return TriggerResult.FIRE_AND_PURGE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 76e36b1..eaca336 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -53,8 +53,21 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
+	public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx);
+		switch (triggerResult) {
+			case FIRE:
+				return TriggerResult.FIRE_AND_PURGE;
+			case FIRE_AND_PURGE:
+				return TriggerResult.FIRE_AND_PURGE;
+			default:
+				return TriggerResult.CONTINUE;
+		}
+	}
+
+	@Override
+	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx);
 		switch (triggerResult) {
 			case FIRE:
 				return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 56b8687..ef8110b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -55,12 +55,21 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
 
 	/**
-	 * Called when a timer that was set using the trigger context fires.
+	 * Called when a processing-time timer that was set using the trigger context fires.
 	 *
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onTime(long time, TriggerContext ctx) throws Exception;
+	TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception;
+
+	/**
+	 * Called when an event-time timer that was set using the trigger context fires.
+	 *
+	 * @param time The timestamp at which the timer fired.
+	 * @param ctx A context object that can be used to register timer callbacks.
+	 */
+	TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception;
+
 
 	/**
 	 * Result type for trigger methods. This determines what happens which the window.
@@ -82,21 +91,21 @@ public interface Trigger<T, W extends Window> extends Serializable {
 
 		/**
 		 * Register a system time callback. When the current system time passes the specified
-		 * time {@link #onTime(long, TriggerContext)} is called.
+		 * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here.
 		 *
-		 * @param time The time at which to invoke {@link #onTime(long, TriggerContext)}
+		 * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)}
 		 */
 		void registerProcessingTimeTimer(long time);
 
 		/**
-		 * Register a watermark callback. When the current watermark passes the specified
-		 * time {@link #onTime(long, TriggerContext)} is called.
+		 * Register an event-time callback. When the current watermark passes the specified
+		 * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here.
 		 *
 		 * @see org.apache.flink.streaming.api.watermark.Watermark
 		 *
-		 * @param time The watermark at which to invoke {@link #onTime(long, TriggerContext)}
+		 * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)}
 		 */
-		void registerWatermarkTimer(long time);
+		void registerEventTimeTimer(long time);
 
 		/**
 		 * Retrieves an {@link OperatorState} object that can be used to interact with

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
deleted file mode 100644
index d17066b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-/**
- * A {@link Trigger} that fires once the watermark passes the end of the window
- * to which a pane belongs.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
-public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private WatermarkTrigger() {}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
-		ctx.registerWatermarkTimer(window.maxTimestamp());
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
-		return TriggerResult.FIRE_AND_PURGE;
-	}
-
-	@Override
-	public String toString() {
-		return "WatermarkTrigger()";
-	}
-
-	/**
-	 * Creates trigger that fires once the watermark passes the end of the window.
-	 */
-	public static WatermarkTrigger create() {
-		return new WatermarkTrigger();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 7ab33cf..5de6cd1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -420,7 +420,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		}
 
 		@Override
-		public void registerWatermarkTimer(long time) {
+		public void registerEventTimeTimer(long time) {
 			if (watermarkTimer == time) {
 				// we already have set a trigger for that time
 				return;
@@ -436,7 +436,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
-				return trigger.onTime(time, this);
+				return trigger.onProcessingTime(time, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}
@@ -444,7 +444,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onTime(time, this);
+				return trigger.onEventTime(time, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 0b3274f..2491c57 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -489,7 +489,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		}
 
 		@Override
-		public void registerWatermarkTimer(long time) {
+		public void registerEventTimeTimer(long time) {
 			if (watermarkTimer == time) {
 				// we already have set a trigger for that time
 				return;
@@ -505,7 +505,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
-				return trigger.onTime(time, this);
+				return trigger.onProcessingTime(time, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}
@@ -513,7 +513,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onTime(time, this);
+				return trigger.onEventTime(time, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 45ef29f..282c71f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index afc65d5..1821308 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import jdk.nashorn.internal.objects.Global;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index a91d957..02e032a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
@@ -29,10 +28,10 @@ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
@@ -79,7 +78,7 @@ public class NonKeyedWindowOperatorTest {
 				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				WatermarkTrigger.create());
+				EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -160,7 +159,7 @@ public class NonKeyedWindowOperatorTest {
 				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				WatermarkTrigger.create());
+				EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -239,7 +238,7 @@ public class NonKeyedWindowOperatorTest {
 				new GlobalWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
+				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index e825b88..b94e530 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -33,10 +33,10 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindow
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -82,7 +82,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				WatermarkTrigger.create());
+				EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -171,7 +171,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
-				WatermarkTrigger.create());
+				EventTimeTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -256,7 +256,7 @@ public class WindowOperatorTest {
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
+				ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 02ec820..13766a1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -116,7 +116,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -140,7 +140,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -217,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/f760b616/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 60b7894..3c63156 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -116,7 +116,7 @@ public class SessionWindowing {
 			// Update the last seen event time
 			lastSeenState.update(timestamp);
 
-			ctx.registerWatermarkTimer(lastSeen + sessionTimeout);
+			ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
 
 			if (timeSinceLastEvent > sessionTimeout) {
 				return TriggerResult.FIRE_AND_PURGE;
@@ -126,7 +126,7 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+		public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
 			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
 			Long lastSeen = lastSeenState.value();
 
@@ -135,6 +135,12 @@ public class SessionWindowing {
 			}
 			return TriggerResult.CONTINUE;
 		}
+
+		@Override
+		public TriggerResult onProcessingTime(long time,
+				TriggerContext ctx) throws Exception {
+			return TriggerResult.CONTINUE;
+		}
 	}
 
 	// *************************************************************************


[2/4] flink git commit: [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
new file mode 100644
index 0000000..2733349
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+/**
+ * This verfies that checkpointing works correctly with event time windows.
+ *
+ * <p>
+ * This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
+ */
+@SuppressWarnings("serial")
+public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
+
+	private static final int PARALLELISM = 4;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+
+	@BeforeClass
+	public static void startTestCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+
+		cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void stopTestCluster() {
+		if (cluster != null) {
+			cluster.stop();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testTumblingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 100;
+		final int NUM_KEYS = 1;
+		FailingSource.reset();
+		
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+			
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS,
+							NUM_ELEMENTS_PER_KEY,
+							NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSlidingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 1000;
+		final int WINDOW_SLIDE = 100;
+		final int NUM_KEYS = 1;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPreAggregatedTumblingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 100;
+		final int NUM_KEYS = 1;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS,
+							NUM_ELEMENTS_PER_KEY,
+							NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.apply(
+							new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+								private boolean open = false;
+
+								@Override
+								public void open(Configuration parameters) {
+									assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+									open = true;
+								}
+
+								@Override
+								public Tuple2<Long, IntType> reduce(
+										Tuple2<Long, IntType> a,
+										Tuple2<Long, IntType> b) {
+
+									// validate that the function has been opened properly
+									assertTrue(open);
+									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+								}
+							},
+							new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPreAggregatedSlidingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 1000;
+		final int WINDOW_SLIDE = 100;
+		final int NUM_KEYS = 1;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS,
+							NUM_ELEMENTS_PER_KEY,
+							NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS),
+							Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.apply(
+							new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+								private boolean open = false;
+
+								@Override
+								public void open(Configuration parameters) {
+									assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+									open = true;
+								}
+
+								@Override
+								public Tuple2<Long, IntType> reduce(
+										Tuple2<Long, IntType> a,
+										Tuple2<Long, IntType> b) {
+
+									// validate that the function has been opened properly
+									assertTrue(open);
+									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+								}
+							},
+							new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static class FailingSource extends RichEventTimeSourceFunction<Tuple2<Long, IntType>>
+			implements Checkpointed<Integer>, CheckpointNotifier
+	{
+		private static volatile boolean failedBefore = false;
+
+		private final int numKeys;
+		private final int numElementsToEmit;
+		private final int failureAfterNumElements;
+
+		private volatile int numElementsEmitted;
+		private volatile int numSuccessfulCheckpoints;
+		private volatile boolean running = true;
+
+		private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+			this.numKeys = numKeys;
+			this.numElementsToEmit = numElementsToEmitPerKey;
+			this.failureAfterNumElements = failureAfterNumElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			// non-parallel source
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+			// we loop longer than we have elements, to permit delayed checkpoints
+			// to still cause a failure
+			while (running) {
+
+				if (!failedBefore) {
+					// delay a bit, if we have not failed before
+					Thread.sleep(1);
+					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+						// cause a failure if we have not failed before and have reached
+						// enough completed checkpoints and elements
+						failedBefore = true;
+						throw new Exception("Artificial Failure");
+					}
+				}
+
+				if (numElementsEmitted < numElementsToEmit &&
+						(failedBefore || numElementsEmitted <= failureAfterNumElements))
+				{
+					// the function failed before, or we are in the elements before the failure
+					synchronized (ctx.getCheckpointLock()) {
+						int next = numElementsEmitted++;
+						for (long i = 0; i < numKeys; i++) {
+							ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+						}
+						ctx.emitWatermark(new Watermark(next));
+					}
+				}
+				else {
+					// exit at some point so that we don't deadlock
+					if (numElementsEmitted > numElementsToEmit * 5) {
+//						running = false;
+						System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit);
+					}
+					// if our work is done, delay a bit to prevent busy waiting
+					Thread.sleep(1);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			numSuccessfulCheckpoints++;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return numElementsEmitted;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			numElementsEmitted = state;
+		}
+
+		public static void reset() {
+			failedBefore = false;
+		}
+	}
+
+	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+			implements Checkpointed<HashMap<Long, Integer>> {
+
+		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+		private final int numKeys;
+		private final int numWindowsExpected;
+
+		private ValidatingSink(int numKeys, int numWindowsExpected) {
+			this.numKeys = numKeys;
+			this.numWindowsExpected = numWindowsExpected;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() throws Exception {
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+			}
+			assertTrue("The source must see all expected windows.", seenAll);
+		}
+
+		@Override
+		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+			// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+			// the sum should be "sum (start .. end-1)"
+
+			int expectedSum = 0;
+			for (long i = value.f1; i < value.f2; i++) {
+				// only sum up positive vals, to filter out the negative start of the
+				// first sliding windows
+				if (i > 0) {
+					expectedSum += i;
+				}
+			}
+
+			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+
+			Integer curr = windowCounts.get(value.f0);
+			if (curr != null) {
+				windowCounts.put(value.f0, curr + 1);
+			}
+			else {
+				windowCounts.put(value.f0, 1);
+			}
+
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					} else if (windowCount > numWindowsExpected) {
+						fail("Window count to high: " + windowCount);
+					}
+				}
+
+				if (seenAll) {
+					// exit
+					throw new SuccessException();
+				}
+
+			}
+		}
+
+		@Override
+		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return this.windowCounts;
+		}
+
+		@Override
+		public void restoreState(HashMap<Long, Integer> state) {
+			this.windowCounts.putAll(state);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception {
+		try {
+			env.execute(jobName);
+		}
+		catch (ProgramInvocationException | JobExecutionException root) {
+			Throwable cause = root.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (!(cause instanceof SuccessException)) {
+				if (cause == null || depth++ == 20) {
+					root.printStackTrace();
+					fail("Test failed: " + root.getMessage());
+				}
+				else {
+					cause = cause.getCause();
+				}
+			}
+		}
+	}
+
+	public static class IntType {
+
+		public int value;
+
+		public IntType() {}
+
+		public IntType(int value) { this.value = value; }
+	}
+
+	static final class SuccessException extends Exception {
+		private static final long serialVersionUID = -9218191172606739598L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..4d1d2c3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichEventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+/**
+ * This verfies that checkpointing works correctly with event time windows. This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the contents
+ * of the emitted windows are deterministic.
+ */
+@SuppressWarnings("serial")
+public class EventTimeWindowCheckpointingITCase extends TestLogger {
+
+	private static final int PARALLELISM = 4;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+
+	@BeforeClass
+	public static void startTestCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+
+		cluster = new ForkableFlinkMiniCluster(config, false, StreamingMode.STREAMING);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void stopTestCluster() {
+		if (cluster != null) {
+			cluster.stop();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testTumblingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 100;
+		final int NUM_KEYS = 100;
+		FailingSource.reset();
+		
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+			
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSlidingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 1000;
+		final int WINDOW_SLIDE = 100;
+		final int NUM_KEYS = 100;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPreAggregatedTumblingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 100;
+		final int NUM_KEYS = 100;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS))
+					.apply(
+							new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+								private boolean open = false;
+
+								@Override
+								public void open(Configuration parameters) {
+									assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+									open = true;
+								}
+
+								@Override
+								public Tuple2<Long, IntType> reduce(
+										Tuple2<Long, IntType> a,
+										Tuple2<Long, IntType> b) {
+
+									// validate that the function has been opened properly
+									assertTrue(open);
+									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+								}
+							},
+							new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPreAggregatedSlidingTimeWindow() {
+		final int NUM_ELEMENTS_PER_KEY = 3000;
+		final int WINDOW_SIZE = 1000;
+		final int WINDOW_SLIDE = 100;
+		final int NUM_KEYS = 100;
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			env
+					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+					.apply(
+							new RichReduceFunction<Tuple2<Long, IntType>>() {
+
+								private boolean open = false;
+
+								@Override
+								public void open(Configuration parameters) {
+									assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+									open = true;
+								}
+
+								@Override
+								public Tuple2<Long, IntType> reduce(
+										Tuple2<Long, IntType> a,
+										Tuple2<Long, IntType> b) {
+
+									// validate that the function has been opened properly
+									assertTrue(open);
+									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+								}
+							},
+							new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static class FailingSource extends RichEventTimeSourceFunction<Tuple2<Long, IntType>>
+			implements Checkpointed<Integer>, CheckpointNotifier
+	{
+		private static volatile boolean failedBefore = false;
+
+		private final int numKeys;
+		private final int numElementsToEmit;
+		private final int failureAfterNumElements;
+
+		private volatile int numElementsEmitted;
+		private volatile int numSuccessfulCheckpoints;
+		private volatile boolean running = true;
+
+		private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+			this.numKeys = numKeys;
+			this.numElementsToEmit = numElementsToEmitPerKey;
+			this.failureAfterNumElements = failureAfterNumElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			// non-parallel source
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+			// we loop longer than we have elements, to permit delayed checkpoints
+			// to still cause a failure
+			while (running) {
+
+				if (!failedBefore) {
+					// delay a bit, if we have not failed before
+					Thread.sleep(1);
+					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+						// cause a failure if we have not failed before and have reached
+						// enough completed checkpoints and elements
+						failedBefore = true;
+						throw new Exception("Artificial Failure");
+					}
+				}
+
+				if (numElementsEmitted < numElementsToEmit &&
+						(failedBefore || numElementsEmitted <= failureAfterNumElements))
+				{
+					// the function failed before, or we are in the elements before the failure
+					synchronized (ctx.getCheckpointLock()) {
+						int next = numElementsEmitted++;
+						for (long i = 0; i < numKeys; i++) {
+							ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+						}
+						ctx.emitWatermark(new Watermark(next));
+					}
+				}
+				else {
+					// exit at some point so that we don't deadlock
+					if (numElementsEmitted > numElementsToEmit * 5) {
+//						running = false;
+						System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit);
+					}
+
+					// if our work is done, delay a bit to prevent busy waiting
+					Thread.sleep(1);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			numSuccessfulCheckpoints++;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return numElementsEmitted;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			numElementsEmitted = state;
+		}
+
+		public static void reset() {
+			failedBefore = false;
+		}
+	}
+
+	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+			implements Checkpointed<HashMap<Long, Integer>> {
+
+		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+		private final int numKeys;
+		private final int numWindowsExpected;
+
+		private ValidatingSink(int numKeys, int numWindowsExpected) {
+			this.numKeys = numKeys;
+			this.numWindowsExpected = numWindowsExpected;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() throws Exception {
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+			}
+			assertTrue("The source must see all expected windows.", seenAll);
+		}
+
+		@Override
+		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+			// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+			// the sum should be "sum (start .. end-1)"
+
+			int expectedSum = 0;
+			for (long i = value.f1; i < value.f2; i++) {
+				// only sum up positive vals, to filter out the negative start of the
+				// first sliding windows
+				if (i > 0) {
+					expectedSum += i;
+				}
+			}
+
+			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+
+			Integer curr = windowCounts.get(value.f0);
+			if (curr != null) {
+				windowCounts.put(value.f0, curr + 1);
+			}
+			else {
+				windowCounts.put(value.f0, 1);
+			}
+
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					} else if (windowCount > numWindowsExpected) {
+						fail("Window count to high: " + windowCount);
+					}
+				}
+
+				if (seenAll) {
+					// exit
+					throw new SuccessException();
+				}
+
+			}
+		}
+
+		@Override
+		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
+			return this.windowCounts;
+		}
+
+		@Override
+		public void restoreState(HashMap<Long, Integer> state) {
+			this.windowCounts.putAll(state);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception {
+		try {
+			env.execute(jobName);
+		}
+		catch (ProgramInvocationException | JobExecutionException root) {
+			Throwable cause = root.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (!(cause instanceof SuccessException)) {
+				if (cause == null || depth++ == 20) {
+					root.printStackTrace();
+					fail("Test failed: " + root.getMessage());
+				}
+				else {
+					cause = cause.getCause();
+				}
+			}
+		}
+	}
+
+	public static class IntType {
+
+		public int value;
+
+		public IntType() {}
+
+		public IntType(int value) { this.value = value; }
+	}
+
+	static final class SuccessException extends Exception {
+		private static final long serialVersionUID = -9218191172606739598L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 298ae5c..e297486 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -36,13 +36,20 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -56,8 +63,15 @@ import static org.junit.Assert.fail;
  * serializability is handled correctly.
  */
 @SuppressWarnings("serial")
+@RunWith(Parameterized.class)
 public class WindowCheckpointingITCase extends TestLogger {
 
+	private TimeCharacteristic timeCharacteristic;
+
+	public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) {
+		this.timeCharacteristic = timeCharacteristic;
+	}
+
 	private static final int PARALLELISM = 4;
 
 	private static ForkableFlinkMiniCluster cluster;
@@ -94,7 +108,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 					"localhost", cluster.getLeaderRPCPort());
 			
 			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+			env.setStreamTimeCharacteristic(timeCharacteristic);
+			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
 			env.setNumberOfExecutionRetries(3);
 			env.getConfig().disableSysoutLogging();
@@ -151,7 +166,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 					"localhost", cluster.getLeaderRPCPort());
 			
 			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+			env.setStreamTimeCharacteristic(timeCharacteristic);
+			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
 			env.setNumberOfExecutionRetries(3);
 			env.getConfig().disableSysoutLogging();
@@ -208,7 +224,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 					"localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+			env.setStreamTimeCharacteristic(timeCharacteristic);
+			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
 			env.setNumberOfExecutionRetries(3);
 			env.getConfig().disableSysoutLogging();
@@ -266,7 +283,8 @@ public class WindowCheckpointingITCase extends TestLogger {
 					"localhost", cluster.getLeaderRPCPort());
 
 			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+			env.setStreamTimeCharacteristic(timeCharacteristic);
+			env.getConfig().setAutoWatermarkInterval(10);
 			env.enableCheckpointing(100);
 			env.setNumberOfExecutionRetries(3);
 			env.getConfig().disableSysoutLogging();
@@ -462,6 +480,18 @@ public class WindowCheckpointingITCase extends TestLogger {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Parametrization for testing different time characteristics
+	// ------------------------------------------------------------------------
+
+	@Parameterized.Parameters(name = "TimeCharacteristic = {0}")
+	@SuppressWarnings("unchecked,rawtypes")
+	public static Collection<TimeCharacteristic[]> timeCharacteristic(){
+		return Arrays.asList(new TimeCharacteristic[]{TimeCharacteristic.ProcessingTime},
+				new TimeCharacteristic[]{TimeCharacteristic.IngestionTime}
+		);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
@@ -498,4 +528,4 @@ public class WindowCheckpointingITCase extends TestLogger {
 	static final class SuccessException extends Exception {
 		private static final long serialVersionUID = -9218191172606739598L;
 	}
-}
\ No newline at end of file
+}


[4/4] flink git commit: [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant

Posted by al...@apache.org.
[FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant

This adds method state() on Trigger context that should be used to
create an OperatorState to deal with fault-tolerant state.

WindowAssigner now has a method getWindowSerializer() that is used to
get a TypeSerializer for the Windows that it assigns. The Serializer for
the Key is retrieved from the input KeyedStream and the serializer for
the input elements is already available.

During checkpointing all currently in-flight windows (per key, per
window) are serialized using the TypeSerializers. The state that is
accessible in Triggers using state() is kept in a
HashMap<String, Serializable>, this is serialized using java
serialization.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44422697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44422697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44422697

Branch: refs/heads/master
Commit: 444226970e31856787fbebdd7793805293faf13b
Parents: e711969
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 11 11:37:29 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 20 18:39:12 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |   1 +
 .../api/datastream/AllWindowedStream.java       |  73 ++-
 .../api/datastream/WindowedStream.java          |  83 ++-
 .../source/RichEventTimeSourceFunction.java     |  47 ++
 .../api/windowing/assigners/GlobalWindows.java  |  10 +-
 .../windowing/assigners/SlidingTimeWindows.java |   9 +-
 .../assigners/TumblingTimeWindows.java          |   8 +-
 .../api/windowing/assigners/WindowAssigner.java |   8 +
 .../ContinuousProcessingTimeTrigger.java        |  28 +-
 .../triggers/ContinuousWatermarkTrigger.java    |  20 +-
 .../api/windowing/triggers/CountTrigger.java    |  22 +-
 .../api/windowing/triggers/DeltaTrigger.java    |  29 +-
 .../triggers/ProcessingTimeTrigger.java         |  12 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   9 +-
 .../api/windowing/triggers/Trigger.java         |  29 +-
 .../windowing/triggers/WatermarkTrigger.java    |  14 +-
 .../api/windowing/windows/GlobalWindow.java     |  97 ++-
 .../api/windowing/windows/TimeWindow.java       | 100 ++-
 .../streaming/api/windowing/windows/Window.java |  12 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   2 +-
 .../EvictingNonKeyedWindowOperator.java         |  37 +-
 .../windowing/EvictingWindowOperator.java       |  53 +-
 .../windowing/NonKeyedWindowOperator.java       | 379 +++++++++---
 .../operators/windowing/WindowOperator.java     | 421 ++++++++++---
 .../windowing/AllWindowTranslationTest.java     |  12 +-
 .../EvictingNonKeyedWindowOperatorTest.java     |   1 +
 .../windowing/EvictingWindowOperatorTest.java   |   4 +
 .../windowing/NonKeyedWindowOperatorTest.java   |   4 +
 .../operators/windowing/WindowOperatorTest.java |   9 +
 .../windowing/WindowTranslationTest.java        |  12 +-
 .../examples/windowing/SessionWindowing.java    |  28 +-
 .../streaming/api/scala/AllWindowedStream.scala |  52 ++
 .../streaming/api/scala/WindowedStream.scala    |  56 ++
 .../api/scala/AllWindowTranslationTest.scala    |  72 ++-
 .../api/scala/WindowTranslationTest.scala       |  68 ++-
 .../EventTimeAllWindowCheckpointingITCase.java  | 603 ++++++++++++++++++
 .../EventTimeWindowCheckpointingITCase.java     | 605 +++++++++++++++++++
 .../WindowCheckpointingITCase.java              |  40 +-
 38 files changed, 2665 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 9ed3e92..f38ecb0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -151,6 +151,7 @@ public class ExecutionConfig implements Serializable {
 	 * @param interval The interval between watermarks in milliseconds.
 	 */
 	public ExecutionConfig setAutoWatermarkInterval(long interval) {
+		enableTimestamps();
 		this.autoWatermarkInterval = interval;
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 83e7adc..7191304 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -146,6 +146,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					new ReduceAllWindowFunction<W, T>(function),
 					trigger,
@@ -157,6 +158,7 @@ public class AllWindowedStream<T, W extends Window> {
 			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
 
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
 					new ReduceAllWindowFunction<W, T>(function),
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -232,7 +234,7 @@ public class AllWindowedStream<T, W extends Window> {
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
+		String udfName = "WindowApply at " + callLocation;
 
 		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
@@ -248,6 +250,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger,
@@ -255,6 +258,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		} else {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -263,6 +267,73 @@ public class AllWindowedStream<T, W extends Window> {
 		return input.transform(opName, resultType, operator).setParallelism(1);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, AllWindowFunction.class, true, true, inType, null, false);
+
+		return apply(preAggregator, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowApply at " + callLocation;
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, R> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+					function,
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregations on the  windows
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index f1220de..033e84f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -158,7 +158,9 @@ public class WindowedStream<T, K, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					new ReduceWindowFunction<K, W, T>(function),
 					trigger,
@@ -170,7 +172,9 @@ public class WindowedStream<T, K, W extends Window> {
 			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
 
 			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
 					new ReduceWindowFunction<K, W, T>(function),
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -241,6 +245,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * is evaluated, as the function provides no means of pre-aggregation.
 	 *
 	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
@@ -248,7 +253,7 @@ public class WindowedStream<T, K, W extends Window> {
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
+		String udfName = "WindowApply at " + callLocation;
 
 		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
@@ -265,7 +270,9 @@ public class WindowedStream<T, K, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger,
@@ -273,7 +280,9 @@ public class WindowedStream<T, K, W extends Window> {
 
 		} else {
 			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -282,6 +291,78 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, WindowFunction.class, true, true, inType, null, false);
+
+		return apply(preAggregator, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowApply at " + callLocation;
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+					function,
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregations on the keyed windows
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
new file mode 100644
index 0000000..6e0086d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Base class for implementing a parallel event-time data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * <p>
+ * This class is useful when implementing parallel sources where different parallel subtasks
+ * need to perform different work. Typical patterns for that are:
+ * <ul>
+ *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
+ *         to determine the current parallelism. It is strongly encouraged to use this method, rather than
+ *         hard-wiring the parallelism, because the configured parallelism may change depending on
+ *         program configuration. The parallelism may also change after recovering failures, when fewer than
+ *         desired parallel worker as available.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
+ *         determine which subtask the current instance of the function executes.</li>
+ * </ul>
+ *
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichEventTimeSourceFunction<OUT> extends AbstractRichFunction implements EventTimeSourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 66c3287..9b7c8f2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -80,10 +82,10 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 		public TriggerResult onTime(long time, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
+	}
 
-		@Override
-		public Trigger<Object, GlobalWindow> duplicate() {
-			return this;
-		}
+	@Override
+	public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new GlobalWindow.Serializer();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 6036dfb..7b1f1f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
@@ -61,7 +63,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 		for (long start = lastStart;
 			start > timestamp - size;
 			start -= slide) {
-			windows.add(new TimeWindow(start, size));
+			windows.add(new TimeWindow(start, start + size));
 		}
 		return windows;
 	}
@@ -99,4 +101,9 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
 		return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
 	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index d57dc33..aa019e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
@@ -53,7 +55,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
 		long start = timestamp - (timestamp % size);
-		return Collections.singletonList(new TimeWindow(start, size));
+		return Collections.singletonList(new TimeWindow(start, start + size));
 	}
 
 	public long getSize() {
@@ -85,4 +87,8 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 		return new TumblingTimeWindows(size.toMilliseconds());
 	}
 
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index d0b1ed0..4b4b1ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -52,4 +54,10 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
 	 * Returns the default trigger associated with this {@code WindowAssigner}.
 	 */
 	public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
+
+	/**
+	 * Returns a {@link TypeSerializer} for serializing windows that are assigned by
+	 * this {@code WindowAssigner}.
+	 */
+	public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index f23f6ee..3ea60f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -30,27 +31,29 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long interval;
-
-	private long nextFireTimestamp = 0;
+	private final long interval;
 
 	private ContinuousProcessingTimeTrigger(long interval) {
 		this.interval = interval;
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
 		long currentTime = System.currentTimeMillis();
+
+		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
+		long nextFireTimestamp = fireState.value();
+
 		if (nextFireTimestamp == 0) {
 			long start = currentTime - (currentTime % interval);
-			nextFireTimestamp = start + interval;
+			fireState.update(start + interval);
 
 			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 			return TriggerResult.CONTINUE;
 		}
 		if (currentTime > nextFireTimestamp) {
 			long start = currentTime - (currentTime % interval);
-			nextFireTimestamp = start + interval;
+			fireState.update(start + interval);
 
 			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 
@@ -60,22 +63,21 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
+	public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+
+		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
+		long nextFireTimestamp = fireState.value();
+
 		// only fire if an element didn't already fire
 		long currentTime = System.currentTimeMillis();
 		if (currentTime > nextFireTimestamp) {
 			long start = currentTime - (currentTime % interval);
-			nextFireTimestamp = start + interval;
+			fireState.update(start + interval);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
 	}
 
-	@Override
-	public Trigger<Object, W> duplicate() {
-		return new ContinuousProcessingTimeTrigger<>(interval);
-	}
-
 	@VisibleForTesting
 	public long getInterval() {
 		return interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index 02ea81d..494ba3a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -32,22 +33,24 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long interval;
-
-	private boolean first = true;
+	private final long interval;
 
 	private ContinuousWatermarkTrigger(long interval) {
 		this.interval = interval;
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
-		if (first) {
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
+
+		OperatorState<Boolean> first = ctx.getKeyValueState("first", true);
+
+		if (first.value()) {
 			long start = timestamp - (timestamp % interval);
 			long nextFireTimestamp = start + interval;
 
 			ctx.registerWatermarkTimer(nextFireTimestamp);
-			first = false;
+
+			first.update(false);
 			return TriggerResult.CONTINUE;
 		}
 		return TriggerResult.CONTINUE;
@@ -60,11 +63,6 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
 	}
 
 	@Override
-	public Trigger<Object, W> duplicate() {
-		return new ContinuousWatermarkTrigger<>(interval);
-	}
-
-	@Override
 	public String toString() {
 		return "ContinuousProcessingTimeTrigger(" + interval + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 53480fe..57582f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -17,8 +17,11 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.io.IOException;
+
 /**
  * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
  *
@@ -27,19 +30,19 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long maxCount;
-	private long count;
+	private final long maxCount;
 
 	private CountTrigger(long maxCount) {
 		this.maxCount = maxCount;
-		count = 0;
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
-		count++;
-		if (count >= maxCount) {
-			count = 0;
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
+		OperatorState<Long> count = ctx.getKeyValueState("count", 0L);
+		long currentCount = count.value() + 1;
+		count.update(currentCount);
+		if (currentCount >= maxCount) {
+			count.update(0L);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
@@ -51,11 +54,6 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	}
 
 	@Override
-	public Trigger<Object, W> duplicate() {
-		return new CountTrigger<>(maxCount);
-	}
-
-	@Override
 	public String toString() {
 		return "CountTrigger(" +  maxCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index cf4cf0c..b1283f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -17,9 +17,12 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.io.Serializable;
+
 /**
  * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
  *
@@ -30,12 +33,11 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+public class DeltaTrigger<T extends Serializable, W extends Window> implements Trigger<T, W> {
 	private static final long serialVersionUID = 1L;
 
-	DeltaFunction<T> deltaFunction;
-	private double threshold;
-	private transient T lastElement;
+	private final DeltaFunction<T> deltaFunction;
+	private final double threshold;
 
 	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
 		this.deltaFunction = deltaFunction;
@@ -43,13 +45,14 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
-		if (lastElement == null) {
-			lastElement = element;
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
+		OperatorState<T> lastElementState = ctx.getKeyValueState("last-element", null);
+		if (lastElementState.value() == null) {
+			lastElementState.update(element);
 			return TriggerResult.CONTINUE;
 		}
-		if (deltaFunction.getDelta(lastElement, element) > this.threshold) {
-			lastElement = element;
+		if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
+			lastElementState.update(element);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
@@ -61,11 +64,6 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public Trigger<T, W> duplicate() {
-		return new DeltaTrigger<>(threshold, deltaFunction);
-	}
-
-	@Override
 	public String toString() {
 		return "DeltaTrigger(" +  deltaFunction + ", " + threshold + ")";
 	}
@@ -78,9 +76,8 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	 *
 	 * @param <T> The type of elements on which this trigger can operate.
 	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 * @return
 	 */
-	public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
+	public static <T extends Serializable, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
 		return new DeltaTrigger<>(threshold, deltaFunction);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index cc3440c..70c57ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -26,16 +26,11 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
-	boolean isFirst = true;
-
 	private ProcessingTimeTrigger() {}
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
-		if (isFirst) {
-			ctx.registerProcessingTimeTimer(window.getEnd());
-			isFirst = false;
-		}
+		ctx.registerProcessingTimeTimer(window.maxTimestamp());
 		return TriggerResult.CONTINUE;
 	}
 
@@ -45,11 +40,6 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public Trigger<Object, TimeWindow> duplicate() {
-		return new ProcessingTimeTrigger();
-	}
-
-	@Override
 	public String toString() {
 		return "ProcessingTimeTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 1c896a7..76e36b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -40,7 +40,7 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
 		switch (triggerResult) {
 			case FIRE:
@@ -53,7 +53,7 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
+	public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
 		switch (triggerResult) {
 			case FIRE:
@@ -66,11 +66,6 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public Trigger<T, W> duplicate() {
-		return new PurgingTrigger<>(nestedTrigger.duplicate());
-	}
-
-	@Override
 	public String toString() {
 		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index f9e2e3e..56b8687 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+
 import java.io.Serializable;
 
 /**
@@ -31,6 +33,11 @@ import java.io.Serializable;
  * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
  * have their own instance of the {@code Trigger}.
  *
+ * <p>
+ * Triggers must not maintain state internally since they can be re-created or reused for
+ * different keys. All necessary state should be persisted using the state abstraction
+ * available on the {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}.
+ *
  * @param <T> The type of elements on which this {@code Trigger} works.
  * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
  */
@@ -45,7 +52,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param window The window to which this pane belongs.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
+	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Called when a timer that was set using the trigger context fires.
@@ -53,13 +60,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onTime(long time, TriggerContext ctx);
-
-	/**
-	 * Creates a duplicate of the {@code Trigger} without the state of the original {@code Trigger}.
-	 * @return The duplicate {@code Trigger} object.
-	 */
-	Trigger<T, W> duplicate();
+	TriggerResult onTime(long time, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Result type for trigger methods. This determines what happens which the window.
@@ -75,7 +76,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 
 	/**
 	 * A context object that is given to {@code Trigger} methods to allow them to register timer
-	 * callbacks.
+	 * callbacks and deal with state.
 	 */
 	interface TriggerContext {
 
@@ -96,5 +97,15 @@ public interface Trigger<T, W extends Window> extends Serializable {
 		 * @param time The watermark at which to invoke {@link #onTime(long, TriggerContext)}
 		 */
 		void registerWatermarkTimer(long time);
+
+		/**
+		 * Retrieves an {@link OperatorState} object that can be used to interact with
+		 * fault-tolerant state that is scoped to the window and key of the current
+		 * trigger invocation.
+		 *
+		 * @param name A unique key for the state.
+		 * @param defaultState The default value of the state.
+		 */
+		<S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
index 5d66ba3..d17066b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -28,16 +28,11 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
-	boolean isFirst = true;
-
 	private WatermarkTrigger() {}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
-		if (isFirst) {
-			ctx.registerWatermarkTimer(window.maxTimestamp());
-			isFirst = false;
-		}
+	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
+		ctx.registerWatermarkTimer(window.maxTimestamp());
 		return TriggerResult.CONTINUE;
 	}
 
@@ -47,11 +42,6 @@ public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public Trigger<Object, TimeWindow> duplicate() {
-		return new WatermarkTrigger();
-	}
-
-	@Override
 	public String toString() {
 		return "WatermarkTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index e0df19d..f20c779 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -17,6 +17,12 @@
  */
 package org.apache.flink.streaming.api.windowing.windows;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
 public class GlobalWindow extends Window {
 
 	private static GlobalWindow INSTANCE = new GlobalWindow();
@@ -28,29 +34,13 @@ public class GlobalWindow extends Window {
 	}
 
 	@Override
-	public long getStart() {
-		return Long.MIN_VALUE;
-	}
-
-	@Override
-	public long getEnd() {
-		return Long.MAX_VALUE;
-	}
-
-	@Override
 	public long maxTimestamp() {
 		return Long.MAX_VALUE;
 	}
 
 	@Override
 	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		return true;
+		return this == o || !(o == null || getClass() != o.getClass());
 	}
 
 	@Override
@@ -62,4 +52,77 @@ public class GlobalWindow extends Window {
 	public String toString() {
 		return "GlobalWindow";
 	}
+
+	public static class Serializer extends TypeSerializer<GlobalWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<GlobalWindow> duplicate() {
+			return this;
+		}
+
+		@Override
+		public GlobalWindow createInstance() {
+			return GlobalWindow.INSTANCE;
+		}
+
+		@Override
+		public GlobalWindow copy(GlobalWindow from) {
+			return from;
+		}
+
+		@Override
+		public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
+			target.writeByte(0);
+		}
+
+		@Override
+		public GlobalWindow deserialize(DataInputView source) throws IOException {
+			source.readByte();
+			return GlobalWindow.INSTANCE;
+		}
+
+		@Override
+		public GlobalWindow deserialize(GlobalWindow reuse,
+				DataInputView source) throws IOException {
+			source.readByte();
+			return GlobalWindow.INSTANCE;
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			source.readByte();
+			target.writeByte(0);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 20080c0..0c4c2a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -17,31 +17,37 @@
  */
 package org.apache.flink.streaming.api.windowing.windows;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * A {@link Window} that represents a time interval from {@code start} (inclusive) to
+ * {@code start + size} (exclusive).
+ */
 public class TimeWindow extends Window {
-	long start;
-	long end;
 
-	public TimeWindow() {
-	}
+	private final long start;
+	private final long end;
 
-	public TimeWindow(long start, long size) {
+	public TimeWindow(long start, long end) {
 		this.start = start;
-		this.end = start + size - 1;
+		this.end = end;
 	}
 
-	@Override
 	public long getStart() {
 		return start;
 	}
 
-	@Override
 	public long getEnd() {
 		return end;
 	}
 
 	@Override
 	public long maxTimestamp() {
-		return end;
+		return end - 1;
 	}
 
 	@Override
@@ -72,4 +78,80 @@ public class TimeWindow extends Window {
 				", end=" + end +
 				'}';
 	}
+
+	public static class Serializer extends TypeSerializer<TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<TimeWindow> duplicate() {
+			return this;
+		}
+
+		@Override
+		public TimeWindow createInstance() {
+			return null;
+		}
+
+		@Override
+		public TimeWindow copy(TimeWindow from) {
+			return from;
+		}
+
+		@Override
+		public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(TimeWindow record, DataOutputView target) throws IOException {
+			target.writeLong(record.start);
+			target.writeLong(record.end);
+		}
+
+		@Override
+		public TimeWindow deserialize(DataInputView source) throws IOException {
+			long start = source.readLong();
+			long end = source.readLong();
+			return new TimeWindow(start, end);
+		}
+
+		@Override
+		public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
+			long start = source.readLong();
+			long end = source.readLong();
+			return new TimeWindow(start, end);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeLong(source.readLong());
+			target.writeLong(source.readLong());
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
index 4e22c32..2e415f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -17,11 +17,15 @@
  */
 package org.apache.flink.streaming.api.windowing.windows;
 
+/**
+ * A {@code Window} is a grouping of elements into finite buckets. Windows have a maximum timestamp
+ * which means that, at some point, all elements that go into one window will have arrived.
+ *
+ * <p>
+ * Subclasses should implement {@code equals()} and {@code hashCode()} so that logically
+ * same windows are treated the same.
+ */
 public abstract class Window {
 
-	public abstract long getStart();
-
-	public abstract long getEnd();
-
 	public abstract long maxTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 9964760..3165f88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -239,7 +239,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	private void computeWindow(long timestamp) throws Exception {
 		out.setTimestamp(timestamp);
 		panes.truncatePanes(numPanesPerWindow);
-		panes.evaluateWindow(out, new TimeWindow(timestamp, windowSize));
+		panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize));
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index bd3572e..1bb451a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -18,17 +18,14 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -45,51 +42,35 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(EvictingNonKeyedWindowOperator.class);
-
 	private final Evictor<? super IN, ? super W> evictor;
 
 	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
 			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, windowBufferFactory, windowFunction, trigger);
+		super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
 		this.evictor = requireNonNull(evictor);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(W window, boolean purge) throws Exception {
-
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = windows.remove(window);
-		} else {
-			bufferAndTrigger = windows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} already gone.", window);
-			return;
-		}
-
-
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
+		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
 
 		int toEvict = 0;
 		if (windowBuffer.size() > 0) {
 			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
 		}
 
 		windowBuffer.removeElements(toEvict);
 
 		userFunction.apply(
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 51413bd..ad43812 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -18,20 +18,15 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
@@ -51,64 +46,38 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(EvictingWindowOperator.class);
-
 	private final Evictor<? super IN, ? super W> evictor;
 
 	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
 			WindowFunction<IN, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
+		super(windowAssigner, windowSerializer, keySelector, keySerializer, windowBufferFactory, windowFunction, trigger);
 		this.evictor = requireNonNull(evictor);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(K key, W window, boolean purge) throws Exception {
-
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
-
-		if (keyWindows == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = keyWindows.remove(window);
-		} else {
-			bufferAndTrigger = keyWindows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
-
-
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
+		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
 
 		int toEvict = 0;
 		if (windowBuffer.size() > 0) {
 			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
 		}
 
 		windowBuffer.removeElements(toEvict);
 
-		userFunction.apply(key,
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+		userFunction.apply(context.key,
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
-
-		if (keyWindows.isEmpty()) {
-			windows.remove(key);
-		}
 	}
 
 	@Override


[3/4] flink git commit: [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index aecfd5d..7ab33cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,12 +18,15 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -37,11 +40,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,26 +73,70 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(NonKeyedWindowOperator.class);
+	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
 
+	// ------------------------------------------------------------------------
+	// Configuration values and stuff from the user
+	// ------------------------------------------------------------------------
 
 	private final WindowAssigner<? super IN, W> windowAssigner;
 
-	private final Trigger<? super IN, ? super W> triggerTemplate;
+	private final Trigger<? super IN, ? super W> trigger;
+
 	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
 
-	protected transient Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> windows;
+	/**
+	 * If this is true. The current processing time is set as the timestamp of incoming elements.
+	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+	 * if eviction should happen based on processing time.
+	 */
+	private boolean setProcessingTime = false;
 
-	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
-	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+	/**
+	 * This is used to copy the incoming element because it can be put into several window
+	 * buffers.
+	 */
+	private TypeSerializer<IN> inputSerializer;
 
+	/**
+	 * For serializing the window in checkpoints.
+	 */
+	private final TypeSerializer<W> windowSerializer;
+
+	// ------------------------------------------------------------------------
+	// State that is not checkpointed
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Processing time timers that are currently in-flight.
+	 */
+	private transient Map<Long, Set<Context>> processingTimeTimers;
+
+	/**
+	 * Current waiting watermark callbacks.
+	 */
+	private transient Map<Long, Set<Context>> watermarkTimers;
+
+	/**
+	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
+	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
-	private boolean setProcessingTime = false;
+	// ------------------------------------------------------------------------
+	// State that needs to be checkpointed
+	// ------------------------------------------------------------------------
 
-	private TypeSerializer<IN> inputSerializer;
+	/**
+	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+	 */
+	protected transient Map<W, Context> windows;
 
+	/**
+	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
+	 */
 	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
 			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
@@ -92,25 +144,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		super(windowFunction);
 
 		this.windowAssigner = requireNonNull(windowAssigner);
+		this.windowSerializer = windowSerializer;
 
 		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.triggerTemplate = requireNonNull(trigger);
+		this.trigger = requireNonNull(trigger);
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
 		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
 	}
 
 	@Override
-	public void open() throws Exception {
+	public final void open() throws Exception {
 		super.open();
-		windows = new HashMap<>();
-		watermarkTimers = new HashMap<>();
-		processingTimeTimers = new HashMap<>();
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		if (inputSerializer == null) {
@@ -119,14 +169,47 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		windowBufferFactory.setRuntimeContext(getRuntimeContext());
 		windowBufferFactory.open(getUserFunctionParameters());
+
+		// these could already be initialized from restoreState()
+		if (watermarkTimers == null) {
+			watermarkTimers = new HashMap<>();
+		}
+		if (processingTimeTimers == null) {
+			processingTimeTimers = new HashMap<>();
+		}
+		if (windows == null) {
+			windows = new HashMap<>();
+		}
+
+		// re-register timers that this window context had set
+		for (Context context: windows.values()) {
+			if (context.processingTimeTimer > 0) {
+				Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
+				if (triggers == null) {
+					getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
+					triggers = new HashSet<>();
+					processingTimeTimers.put(context.processingTimeTimer, triggers);
+				}
+				triggers.add(context);
+			}
+			if (context.watermarkTimer > 0) {
+				Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
+				if (triggers == null) {
+					triggers = new HashSet<>();
+					watermarkTimers.put(context.watermarkTimer, triggers);
+				}
+				triggers.add(context);
+			}
+
+		}
 	}
 
 	@Override
-	public void close() throws Exception {
+	public final void close() throws Exception {
 		super.close();
 		// emit the elements that we still keep
-		for (W window: windows.keySet()) {
-			emitWindow(window, false);
+		for (Context window: windows.values()) {
+			emitWindow(window);
 		}
 		windows.clear();
 		windowBufferFactory.close();
@@ -134,58 +217,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void processElement(StreamRecord<IN> element) throws Exception {
+	public final void processElement(StreamRecord<IN> element) throws Exception {
 		if (setProcessingTime) {
 			element.replace(element.getValue(), System.currentTimeMillis());
 		}
+
 		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
 		for (W window: elementWindows) {
-			Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = windows.get(window);
-			if (bufferAndTrigger == null) {
-				bufferAndTrigger = new Tuple2<>();
-				bufferAndTrigger.f0 = windowBufferFactory.create();
-				bufferAndTrigger.f1 = new TriggerContext(window, triggerTemplate.duplicate());
-				windows.put(window, bufferAndTrigger);
+			Context context = windows.get(window);
+			if (context == null) {
+				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+				context = new Context(window, windowBuffer);
+				windows.put(window, context);
 			}
 			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			bufferAndTrigger.f0.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+			context.windowBuffer.storeElement(elementCopy);
+			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, window);
 		}
 	}
 
-	protected void emitWindow(W window, boolean purge) throws Exception {
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = windows.remove(window);
-		} else {
-			bufferAndTrigger = windows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} already gone.", window);
-			return;
-		}
-
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
 		userFunction.apply(
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
 		switch (triggerResult) {
-			case FIRE:
-				emitWindow(window, false);
+			case FIRE: {
+				Context context = windows.get(window);
+				if (context == null) {
+					LOG.debug("Window {} already gone.", window);
+					return;
+				}
+
+
+				emitWindow(context);
 				break;
+			}
 
-			case FIRE_AND_PURGE:
-				emitWindow(window, true);
+			case FIRE_AND_PURGE: {
+				Context context = windows.remove(window);
+				if (context == null) {
+					LOG.debug("Window {} already gone.", window);
+					return;
+				}
+
+				emitWindow(context);
 				break;
+			}
 
 			case CONTINUE:
 				// ingore
@@ -193,14 +278,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
+	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
-					processTriggerResult(triggerResult, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
+					processTriggerResult(triggerResult, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -213,14 +298,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
+	public final void trigger(long time) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
 			if (triggers.getKey() < time) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
-					processTriggerResult(triggerResult, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					processTriggerResult(triggerResult, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -231,35 +316,139 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		}
 	}
 
-	protected class TriggerContext implements Trigger.TriggerContext {
-		Trigger<? super IN, ? super W> trigger;
-		W window;
+	/**
+	 * A context object that is given to {@code Trigger} functions to allow them to register
+	 * timer/watermark callbacks.
+	 */
+	protected class Context implements Trigger.TriggerContext {
+		protected W window;
+
+		protected WindowBuffer<IN> windowBuffer;
+
+		protected HashMap<String, Serializable> state;
+
+		// use these to only allow one timer in flight at a time of each type
+		// if the trigger registers another timer this value here will be overwritten,
+		// the timer is not removed from the set of in-flight timers to improve performance.
+		// When a trigger fires it is just checked against the last timer that was set.
+		protected long watermarkTimer;
+		protected long processingTimeTimer;
 
-		public TriggerContext(W window, Trigger<? super IN, ? super W> trigger) {
+		public Context(
+				W window,
+				WindowBuffer<IN> windowBuffer) {
 			this.window = window;
-			this.trigger = trigger;
+			this.windowBuffer = windowBuffer;
+			state = new HashMap<>();
+
+			this.watermarkTimer = -1;
+			this.processingTimeTimer = -1;
+		}
+
+
+		@SuppressWarnings("unchecked")
+		protected Context(DataInputView in) throws Exception {
+			this.window = windowSerializer.deserialize(in);
+			this.watermarkTimer = in.readLong();
+			this.processingTimeTimer = in.readLong();
+
+			int stateSize = in.readInt();
+			byte[] stateData = new byte[stateSize];
+			in.read(stateData);
+			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
+			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+
+			this.windowBuffer = windowBufferFactory.create();
+			int numElements = in.readInt();
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			for (int i = 0; i < numElements; i++) {
+				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+			}
+		}
+
+		protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+			windowSerializer.serialize(window, out);
+			out.writeLong(watermarkTimer);
+			out.writeLong(processingTimeTimer);
+
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			SerializationUtils.serialize(state, baos);
+			out.writeInt(baos.size());
+			out.write(baos.toByteArray(), 0, baos.size());
+
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			out.writeInt(windowBuffer.size());
+			for (StreamRecord<IN> element: windowBuffer.getElements()) {
+				recordSerializer.serialize(element, out);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
+			return new OperatorState<S>() {
+				@Override
+				public S value() throws IOException {
+					Serializable value = state.get(name);
+					if (value == null) {
+						state.put(name, defaultState);
+						value = defaultState;
+					}
+					return (S) value;
+				}
+
+				@Override
+				public void update(S value) throws IOException {
+					state.put(name, value);
+				}
+			};
 		}
 
 		@Override
 		public void registerProcessingTimeTimer(long time) {
-			Set<TriggerContext> triggers = processingTimeTimers.get(time);
+			if (this.processingTimeTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = processingTimeTimers.get(time);
 			if (triggers == null) {
 				getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
 				triggers = new HashSet<>();
 				processingTimeTimers.put(time, triggers);
 			}
+			this.processingTimeTimer = time;
 			triggers.add(this);
 		}
 
 		@Override
 		public void registerWatermarkTimer(long time) {
-			Set<TriggerContext> triggers = watermarkTimers.get(time);
+			if (watermarkTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = watermarkTimers.get(time);
 			if (triggers == null) {
 				triggers = new HashSet<>();
 				watermarkTimers.put(time, triggers);
 			}
+			this.watermarkTimer = time;
 			triggers.add(this);
 		}
+
+		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+			if (time == processingTimeTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
+
+		public Trigger.TriggerResult onEventTime(long time) throws Exception {
+			if (time == watermarkTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
 	}
 
 	/**
@@ -274,7 +463,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+	public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
 		if (userFunction instanceof OutputTypeConfigurable) {
 			@SuppressWarnings("unchecked")
 			OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
@@ -283,12 +472,59 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		// we write the panes with the key/value maps into the stream
+		StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		int numWindows = windows.size();
+		out.writeInt(numWindows);
+		for (Context context: windows.values()) {
+			context.writeToState(out);
+		}
+
+		taskState.setOperatorState(out.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
+
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+		DataInputView in = inputState.getState(getUserCodeClassloader());
+
+		int numWindows = in.readInt();
+		this.windows = new HashMap<>(numWindows);
+		this.processingTimeTimers = new HashMap<>();
+		this.watermarkTimers = new HashMap<>();
+
+		for (int j = 0; j < numWindows; j++) {
+			Context context = new Context(in);
+			windows.put(context.window, context);
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTriggerTemplate() {
-		return triggerTemplate;
+	public boolean isSetProcessingTime() {
+		return setProcessingTime;
+	}
+
+	@VisibleForTesting
+	public Trigger<? super IN, ? super W> getTrigger() {
+		return trigger;
 	}
 
 	@VisibleForTesting
@@ -300,9 +536,4 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
 		return windowBufferFactory;
 	}
-
-	@VisibleForTesting
-	public boolean isSetProcessingTime() {
-		return setProcessingTime;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 82a3f9a..0b3274f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,13 +18,16 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -38,10 +41,16 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,49 +93,77 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
 
+	// ------------------------------------------------------------------------
+	// Configuration values and user functions
+	// ------------------------------------------------------------------------
+
 	private final WindowAssigner<? super IN, W> windowAssigner;
 
 	private final KeySelector<IN, K> keySelector;
 
-	private final Trigger<? super IN, ? super W> triggerTemplate;
+	private final Trigger<? super IN, ? super W> trigger;
 
 	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
 
 	/**
-	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
-	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+	 * If this is true. The current processing time is set as the timestamp of incoming elements.
+	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+	 * if eviction should happen based on processing time.
+	 */
+	private boolean setProcessingTime = false;
+
+	/**
+	 * This is used to copy the incoming element because it can be put into several window
+	 * buffers.
+	 */
+	private TypeSerializer<IN> inputSerializer;
+
+	/**
+	 * For serializing the key in checkpoints.
 	 */
-	protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
+	private final TypeSerializer<K> keySerializer;
+
+	/**
+	 * For serializing the window in checkpoints.
+	 */
+	private final TypeSerializer<W> windowSerializer;
+
+	// ------------------------------------------------------------------------
+	// State that is not checkpointed
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Processing time timers that are currently in-flight.
 	 */
-	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+	private transient Map<Long, Set<Context>> processingTimeTimers;
 
 	/**
 	 * Current waiting watermark callbacks.
 	 */
-	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+	private transient Map<Long, Set<Context>> watermarkTimers;
 
 	/**
 	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	// ------------------------------------------------------------------------
+	// State that needs to be checkpointed
+	// ------------------------------------------------------------------------
+
 	/**
-	 * If this is true. The current processing time is set as the timestamp of incoming elements.
-	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
-	 * if eviction should happen based on processing time.
+	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
 	 */
-	private boolean setProcessingTime = false;
-
-	private TypeSerializer<IN> inputSerializer;
+	protected transient Map<K, Map<W, Context>> windows;
 
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
 	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
 			WindowFunction<IN, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
@@ -134,27 +171,26 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		super(windowFunction);
 
 		this.windowAssigner = requireNonNull(windowAssigner);
+		this.windowSerializer = windowSerializer;
 		this.keySelector = requireNonNull(keySelector);
+		this.keySerializer = requireNonNull(keySerializer);
 
 		this.windowBufferFactory = requireNonNull(windowBufferFactory);
-		this.triggerTemplate = requireNonNull(trigger);
+		this.trigger = requireNonNull(trigger);
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
-//		forceInputCopy();
 	}
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
 		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
 	}
 
 	@Override
-	public void open() throws Exception {
+	public final void open() throws Exception {
 		super.open();
-		windows = new HashMap<>();
-		watermarkTimers = new HashMap<>();
-		processingTimeTimers = new HashMap<>();
+
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		if (inputSerializer == null) {
@@ -163,17 +199,53 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		windowBufferFactory.setRuntimeContext(getRuntimeContext());
 		windowBufferFactory.open(getUserFunctionParameters());
+
+
+		// these could already be initialized from restoreState()
+		if (watermarkTimers == null) {
+			watermarkTimers = new HashMap<>();
+		}
+		if (processingTimeTimers == null) {
+			processingTimeTimers = new HashMap<>();
+		}
+		if (windows == null) {
+			windows = new HashMap<>();
+		}
+
+		// re-register timers that this window context had set
+		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+			Map<W, Context> keyWindows = entry.getValue();
+			for (Context context: keyWindows.values()) {
+				if (context.processingTimeTimer > 0) {
+					Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
+					if (triggers == null) {
+						getRuntimeContext().registerTimer(context.processingTimeTimer, WindowOperator.this);
+						triggers = new HashSet<>();
+						processingTimeTimers.put(context.processingTimeTimer, triggers);
+					}
+					triggers.add(context);
+				}
+				if (context.watermarkTimer > 0) {
+					Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
+					if (triggers == null) {
+						triggers = new HashSet<>();
+						watermarkTimers.put(context.watermarkTimer, triggers);
+					}
+					triggers.add(context);
+				}
+
+			}
+		}
 	}
 
 	@Override
-	public void close() throws Exception {
+	public final void close() throws Exception {
 		super.close();
 		// emit the elements that we still keep
-		for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> entry: windows.entrySet()) {
-			K key = entry.getKey();
-			Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = entry.getValue();
-			for (W window: keyWindows.keySet()) {
-				emitWindow(key, window, false);
+		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+			Map<W, Context> keyWindows = entry.getValue();
+			for (Context window: keyWindows.values()) {
+				emitWindow(window);
 			}
 		}
 		windows.clear();
@@ -182,77 +254,81 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void processElement(StreamRecord<IN> element) throws Exception {
+	public final void processElement(StreamRecord<IN> element) throws Exception {
 		if (setProcessingTime) {
 			element.replace(element.getValue(), System.currentTimeMillis());
 		}
+
 		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
 		K key = keySelector.getKey(element.getValue());
 
-		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
+		Map<W, Context> keyWindows = windows.get(key);
 		if (keyWindows == null) {
 			keyWindows = new HashMap<>();
 			windows.put(key, keyWindows);
 		}
 
 		for (W window: elementWindows) {
-			Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = keyWindows.get(window);
-			if (bufferAndTrigger == null) {
-				bufferAndTrigger = new Tuple2<>();
-				bufferAndTrigger.f0 = windowBufferFactory.create();
-				bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate());
-				keyWindows.put(window, bufferAndTrigger);
+			Context context = keyWindows.get(window);
+			if (context == null) {
+				WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
+				context = new Context(key, window, windowBuffer);
+				keyWindows.put(window, context);
 			}
 			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			bufferAndTrigger.f0.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+			context.windowBuffer.storeElement(elementCopy);
+			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, key, window);
 		}
 	}
 
-	protected void emitWindow(K key, W window, boolean purge) throws Exception {
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
-
-		if (keyWindows == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = keyWindows.remove(window);
-		} else {
-			bufferAndTrigger = keyWindows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
-
-		userFunction.apply(key,
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+		userFunction.apply(context.key,
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
-
-		if (keyWindows.isEmpty()) {
-			windows.remove(key);
-		}
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
 		switch (triggerResult) {
-			case FIRE:
-				emitWindow(key, window, false);
+			case FIRE: {
+				Map<W, Context> keyWindows = windows.get(key);
+				if (keyWindows == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+				Context context = keyWindows.get(window);
+				if (context == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+
+
+				emitWindow(context);
 				break;
+			}
 
-			case FIRE_AND_PURGE:
-				emitWindow(key, window, true);
+			case FIRE_AND_PURGE: {
+				Map<W, Context> keyWindows = windows.get(key);
+				if (keyWindows == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+				Context context = keyWindows.remove(window);
+				if (context == null) {
+					LOG.debug("Window {} for key {} already gone.", window, key);
+					return;
+				}
+				if (keyWindows.isEmpty()) {
+					windows.remove(key);
+				}
+
+				emitWindow(context);
 				break;
+			}
 
 			case CONTINUE:
 				// ingore
@@ -260,14 +336,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
+	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
-					processTriggerResult(triggerResult, trigger.key, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
+					processTriggerResult(triggerResult, context.key, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -280,14 +356,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
+	public final void trigger(long time) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
 
-		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
 			if (triggers.getKey() < time) {
-				for (TriggerContext trigger: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
-					processTriggerResult(triggerResult, trigger.key, trigger.window);
+				for (Context context: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = context.onProcessingTime(time);
+					processTriggerResult(triggerResult, context.key, context.window);
 				}
 				toRemove.add(triggers.getKey());
 			}
@@ -302,37 +378,146 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	 * A context object that is given to {@code Trigger} functions to allow them to register
 	 * timer/watermark callbacks.
 	 */
-	protected class TriggerContext implements Trigger.TriggerContext {
-		Trigger<? super IN, ? super W> trigger;
-		K key;
-		W window;
+	protected class Context implements Trigger.TriggerContext {
+		protected K key;
+		protected W window;
+
+		protected WindowBuffer<IN> windowBuffer;
 
-		public TriggerContext(K key, W window, Trigger<? super IN, ? super W> trigger) {
+		protected HashMap<String, Serializable> state;
+
+		// use these to only allow one timer in flight at a time of each type
+		// if the trigger registers another timer this value here will be overwritten,
+		// the timer is not removed from the set of in-flight timers to improve performance.
+		// When a trigger fires it is just checked against the last timer that was set.
+		protected long watermarkTimer;
+		protected long processingTimeTimer;
+
+		public Context(K key,
+				W window,
+				WindowBuffer<IN> windowBuffer) {
 			this.key = key;
 			this.window = window;
-			this.trigger = trigger;
+			this.windowBuffer = windowBuffer;
+			state = new HashMap<>();
+
+			this.watermarkTimer = -1;
+			this.processingTimeTimer = -1;
+		}
+
+		/**
+		 * Constructs a new {@code Context} by reading from a {@link DataInputView} that
+		 * contains a serialized context that we wrote in
+		 * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
+		 */
+		@SuppressWarnings("unchecked")
+		protected Context(DataInputView in) throws Exception {
+			this.key = keySerializer.deserialize(in);
+			this.window = windowSerializer.deserialize(in);
+			this.watermarkTimer = in.readLong();
+			this.processingTimeTimer = in.readLong();
+
+			int stateSize = in.readInt();
+			byte[] stateData = new byte[stateSize];
+			in.read(stateData);
+			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
+			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+
+			this.windowBuffer = windowBufferFactory.create();
+			int numElements = in.readInt();
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			for (int i = 0; i < numElements; i++) {
+				windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+			}
+		}
+
+		/**
+		 * Writes the {@code Context} to the given state checkpoint output.
+		 */
+		protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+			keySerializer.serialize(key, out);
+			windowSerializer.serialize(window, out);
+			out.writeLong(watermarkTimer);
+			out.writeLong(processingTimeTimer);
+
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			SerializationUtils.serialize(state, baos);
+			out.writeInt(baos.size());
+			out.write(baos.toByteArray(), 0, baos.size());
+
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			out.writeInt(windowBuffer.size());
+			for (StreamRecord<IN> element: windowBuffer.getElements()) {
+				recordSerializer.serialize(element, out);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
+			return new OperatorState<S>() {
+				@Override
+				public S value() throws IOException {
+					Serializable value = state.get(name);
+					if (value == null) {
+						state.put(name, defaultState);
+						value = defaultState;
+					}
+					return (S) value;
+				}
+
+				@Override
+				public void update(S value) throws IOException {
+					state.put(name, value);
+				}
+			};
 		}
 
 		@Override
 		public void registerProcessingTimeTimer(long time) {
-			Set<TriggerContext> triggers = processingTimeTimers.get(time);
+			if (this.processingTimeTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = processingTimeTimers.get(time);
 			if (triggers == null) {
 				getRuntimeContext().registerTimer(time, WindowOperator.this);
 				triggers = new HashSet<>();
 				processingTimeTimers.put(time, triggers);
 			}
+			this.processingTimeTimer = time;
 			triggers.add(this);
 		}
 
 		@Override
 		public void registerWatermarkTimer(long time) {
-			Set<TriggerContext> triggers = watermarkTimers.get(time);
+			if (watermarkTimer == time) {
+				// we already have set a trigger for that time
+				return;
+			}
+			Set<Context> triggers = watermarkTimers.get(time);
 			if (triggers == null) {
 				triggers = new HashSet<>();
 				watermarkTimers.put(time, triggers);
 			}
+			this.watermarkTimer = time;
 			triggers.add(this);
 		}
+
+		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
+			if (time == processingTimeTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
+
+		public Trigger.TriggerResult onEventTime(long time) throws Exception {
+			if (time == watermarkTimer) {
+				return trigger.onTime(time, this);
+			} else {
+				return Trigger.TriggerResult.CONTINUE;
+			}
+		}
 	}
 
 	/**
@@ -347,7 +532,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+	public final void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
 		if (userFunction instanceof OutputTypeConfigurable) {
 			@SuppressWarnings("unchecked")
 			OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction;
@@ -356,6 +541,60 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		// we write the panes with the key/value maps into the stream
+		StateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		int numKeys = windows.size();
+		out.writeInt(numKeys);
+
+		for (Map.Entry<K, Map<W, Context>> keyWindows: windows.entrySet()) {
+			int numWindows = keyWindows.getValue().size();
+			out.writeInt(numWindows);
+			for (Context context: keyWindows.getValue().values()) {
+				context.writeToState(out);
+			}
+		}
+
+		taskState.setOperatorState(out.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
+
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+		DataInputView in = inputState.getState(getUserCodeClassloader());
+
+		int numKeys = in.readInt();
+		this.windows = new HashMap<>(numKeys);
+		this.processingTimeTimers = new HashMap<>();
+		this.watermarkTimers = new HashMap<>();
+
+		for (int i = 0; i < numKeys; i++) {
+			int numWindows = in.readInt();
+			for (int j = 0; j < numWindows; j++) {
+				Context context = new Context(in);
+				Map<W, Context> keyWindows = windows.get(context.key);
+				if (keyWindows == null) {
+					keyWindows = new HashMap<>(numWindows);
+					windows.put(context.key, keyWindows);
+				}
+				keyWindows.put(context.window, context);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------
 
@@ -365,8 +604,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@VisibleForTesting
-	public Trigger<? super IN, ? super W> getTriggerTemplate() {
-		return triggerTemplate;
+	public Trigger<? super IN, ? super W> getTrigger() {
+		return trigger;
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 4fa16ac..45ef29f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -119,7 +119,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -143,7 +143,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
@@ -194,7 +194,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 3139941..39033cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -54,6 +54,7 @@ public class EvictingNonKeyedWindowOperatorTest {
 
 		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
 				CountTrigger.of(WINDOW_SLIDE),

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 3d9605e..afc65d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import jdk.nashorn.internal.objects.Global;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -54,7 +56,9 @@ public class EvictingWindowOperatorTest {
 
 		EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
 				CountTrigger.of(WINDOW_SLIDE),

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 6cc8931..a91d957 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -76,6 +76,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -156,6 +157,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -234,6 +236,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -312,6 +315,7 @@ public class NonKeyedWindowOperatorTest {
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index d387df0..e825b88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -76,7 +77,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
 				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -163,7 +166,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
 				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -246,7 +251,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -331,7 +338,9 @@ public class WindowOperatorTest {
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
+				new GlobalWindow.Serializer(),
 				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 10fe734..02ec820 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -116,7 +116,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -140,7 +140,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -166,7 +166,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -191,7 +191,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertTrue(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
@@ -217,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
 		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getTrigger() instanceof WatermarkTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
@@ -244,7 +244,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
 		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
 		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 950b0f5..60b7894 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -97,8 +98,7 @@ public class SessionWindowing {
 
 		private static final long serialVersionUID = 1L;
 
-		private volatile Long lastSeenEvent = 1L;
-		private Long sessionTimeout;
+		private final Long sessionTimeout;
 
 		public SessionTrigger(Long sessionTimeout) {
 			this.sessionTimeout = sessionTimeout;
@@ -106,13 +106,17 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) {
-			Long timeSinceLastEvent = timestamp - lastSeenEvent;
+		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
+
+			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+			Long lastSeen = lastSeenState.value();
+
+			Long timeSinceLastEvent = timestamp - lastSeen;
 
 			// Update the last seen event time
-			lastSeenEvent = timestamp;
+			lastSeenState.update(timestamp);
 
-			ctx.registerWatermarkTimer(lastSeenEvent + sessionTimeout);
+			ctx.registerWatermarkTimer(lastSeen + sessionTimeout);
 
 			if (timeSinceLastEvent > sessionTimeout) {
 				return TriggerResult.FIRE_AND_PURGE;
@@ -122,17 +126,15 @@ public class SessionWindowing {
 		}
 
 		@Override
-		public TriggerResult onTime(long time, TriggerContext ctx) {
-			if (time - lastSeenEvent >= sessionTimeout) {
+		public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+			OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L);
+			Long lastSeen = lastSeenState.value();
+
+			if (time - lastSeen >= sessionTimeout) {
 				return TriggerResult.FIRE_AND_PURGE;
 			}
 			return TriggerResult.CONTINUE;
 		}
-
-		@Override
-		public SessionTrigger duplicate() {
-			return new SessionTrigger(sessionTimeout);
-		}
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 33104ab..0357144 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -202,6 +202,58 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: ReduceFunction[T],
+      function: AllWindowFunction[T, R, W]): DataStream[R] = {
+    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: (T, T) => T,
+      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new AllWindowFunction[T, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanApply(window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index d4f4618..93b91ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -196,6 +196,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
     val cleanedFunction = clean(function)
     val applyFunction = new WindowFunction[T, R, K, W] {
       def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
@@ -205,6 +209,58 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: ReduceFunction[T],
+      function: WindowFunction[T, R, K, W]): DataStream[R] = {
+    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is called for each
+   * evaluation of the window for each key individually. The output of the window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: (T, T) => T,
+      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new WindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
+        cleanApply(key, window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 99fcd07..7da7bc3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,8 +22,9 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
@@ -111,7 +112,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -134,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
@@ -161,7 +162,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -185,11 +186,72 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
+
+  @Test
+  def testPreReduce(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(
+      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(
+      winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+  }
+
 }
 
 // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 65f978c..46981ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -108,7 +108,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -133,7 +133,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
@@ -161,7 +161,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -187,9 +187,69 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
+
+  @Test
+  def testPreReduce(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(
+      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(
+      winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+  }
 }