You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/20 18:42:23 UTC

[4/4] flink git commit: [FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant

[FLINK-2864] Make State of General-Purpose Window Operators Fault-Tolerant

This adds method state() on Trigger context that should be used to
create an OperatorState to deal with fault-tolerant state.

WindowAssigner now has a method getWindowSerializer() that is used to
get a TypeSerializer for the Windows that it assigns. The Serializer for
the Key is retrieved from the input KeyedStream and the serializer for
the input elements is already available.

During checkpointing all currently in-flight windows (per key, per
window) are serialized using the TypeSerializers. The state that is
accessible in Triggers using state() is kept in a
HashMap<String, Serializable>, this is serialized using java
serialization.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44422697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44422697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44422697

Branch: refs/heads/master
Commit: 444226970e31856787fbebdd7793805293faf13b
Parents: e711969
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 11 11:37:29 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 20 18:39:12 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |   1 +
 .../api/datastream/AllWindowedStream.java       |  73 ++-
 .../api/datastream/WindowedStream.java          |  83 ++-
 .../source/RichEventTimeSourceFunction.java     |  47 ++
 .../api/windowing/assigners/GlobalWindows.java  |  10 +-
 .../windowing/assigners/SlidingTimeWindows.java |   9 +-
 .../assigners/TumblingTimeWindows.java          |   8 +-
 .../api/windowing/assigners/WindowAssigner.java |   8 +
 .../ContinuousProcessingTimeTrigger.java        |  28 +-
 .../triggers/ContinuousWatermarkTrigger.java    |  20 +-
 .../api/windowing/triggers/CountTrigger.java    |  22 +-
 .../api/windowing/triggers/DeltaTrigger.java    |  29 +-
 .../triggers/ProcessingTimeTrigger.java         |  12 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   9 +-
 .../api/windowing/triggers/Trigger.java         |  29 +-
 .../windowing/triggers/WatermarkTrigger.java    |  14 +-
 .../api/windowing/windows/GlobalWindow.java     |  97 ++-
 .../api/windowing/windows/TimeWindow.java       | 100 ++-
 .../streaming/api/windowing/windows/Window.java |  12 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   2 +-
 .../EvictingNonKeyedWindowOperator.java         |  37 +-
 .../windowing/EvictingWindowOperator.java       |  53 +-
 .../windowing/NonKeyedWindowOperator.java       | 379 +++++++++---
 .../operators/windowing/WindowOperator.java     | 421 ++++++++++---
 .../windowing/AllWindowTranslationTest.java     |  12 +-
 .../EvictingNonKeyedWindowOperatorTest.java     |   1 +
 .../windowing/EvictingWindowOperatorTest.java   |   4 +
 .../windowing/NonKeyedWindowOperatorTest.java   |   4 +
 .../operators/windowing/WindowOperatorTest.java |   9 +
 .../windowing/WindowTranslationTest.java        |  12 +-
 .../examples/windowing/SessionWindowing.java    |  28 +-
 .../streaming/api/scala/AllWindowedStream.scala |  52 ++
 .../streaming/api/scala/WindowedStream.scala    |  56 ++
 .../api/scala/AllWindowTranslationTest.scala    |  72 ++-
 .../api/scala/WindowTranslationTest.scala       |  68 ++-
 .../EventTimeAllWindowCheckpointingITCase.java  | 603 ++++++++++++++++++
 .../EventTimeWindowCheckpointingITCase.java     | 605 +++++++++++++++++++
 .../WindowCheckpointingITCase.java              |  40 +-
 38 files changed, 2665 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 9ed3e92..f38ecb0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -151,6 +151,7 @@ public class ExecutionConfig implements Serializable {
 	 * @param interval The interval between watermarks in milliseconds.
 	 */
 	public ExecutionConfig setAutoWatermarkInterval(long interval) {
+		enableTimestamps();
 		this.autoWatermarkInterval = interval;
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 83e7adc..7191304 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -146,6 +146,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					new ReduceAllWindowFunction<W, T>(function),
 					trigger,
@@ -157,6 +158,7 @@ public class AllWindowedStream<T, W extends Window> {
 			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
 
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
 					new ReduceAllWindowFunction<W, T>(function),
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -232,7 +234,7 @@ public class AllWindowedStream<T, W extends Window> {
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
+		String udfName = "WindowApply at " + callLocation;
 
 		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
@@ -248,6 +250,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger,
@@ -255,6 +258,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		} else {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -263,6 +267,73 @@ public class AllWindowedStream<T, W extends Window> {
 		return input.transform(opName, resultType, operator).setParallelism(1);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, AllWindowFunction.class, true, true, inType, null, false);
+
+		return apply(preAggregator, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowApply at " + callLocation;
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, R> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+					function,
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregations on the  windows
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index f1220de..033e84f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -158,7 +158,9 @@ public class WindowedStream<T, K, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					new ReduceWindowFunction<K, W, T>(function),
 					trigger,
@@ -170,7 +172,9 @@ public class WindowedStream<T, K, W extends Window> {
 			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
 
 			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
 					new ReduceWindowFunction<K, W, T>(function),
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -241,6 +245,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * is evaluated, as the function provides no means of pre-aggregation.
 	 *
 	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
@@ -248,7 +253,7 @@ public class WindowedStream<T, K, W extends Window> {
 		function = input.getExecutionEnvironment().clean(function);
 
 		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
+		String udfName = "WindowApply at " + callLocation;
 
 		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
 		if (result != null) {
@@ -265,7 +270,9 @@ public class WindowedStream<T, K, W extends Window> {
 
 		if (evictor != null) {
 			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger,
@@ -273,7 +280,9 @@ public class WindowedStream<T, K, W extends Window> {
 
 		} else {
 			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger).enableSetProcessingTime(setProcessingTime);
@@ -282,6 +291,78 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, WindowFunction.class, true, true, inType, null, false);
+
+		return apply(preAggregator, function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+	 *
+	 * @param preAggregator The reduce function that is used for pre-aggregation
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+		//clean the closures
+		function = input.getExecutionEnvironment().clean(function);
+		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowApply at " + callLocation;
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor).enableSetProcessingTime(setProcessingTime);
+
+		} else {
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+					function,
+					trigger).enableSetProcessingTime(setProcessingTime);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Aggregations on the keyed windows
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
new file mode 100644
index 0000000..6e0086d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Base class for implementing a parallel event-time data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * <p>
+ * This class is useful when implementing parallel sources where different parallel subtasks
+ * need to perform different work. Typical patterns for that are:
+ * <ul>
+ *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
+ *         to determine the current parallelism. It is strongly encouraged to use this method, rather than
+ *         hard-wiring the parallelism, because the configured parallelism may change depending on
+ *         program configuration. The parallelism may also change after recovering failures, when fewer than
+ *         desired parallel worker as available.</li>
+ *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
+ *         determine which subtask the current instance of the function executes.</li>
+ * </ul>
+ *
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichEventTimeSourceFunction<OUT> extends AbstractRichFunction implements EventTimeSourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 66c3287..9b7c8f2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -80,10 +82,10 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 		public TriggerResult onTime(long time, TriggerContext ctx) {
 			return TriggerResult.CONTINUE;
 		}
+	}
 
-		@Override
-		public Trigger<Object, GlobalWindow> duplicate() {
-			return this;
-		}
+	@Override
+	public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new GlobalWindow.Serializer();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 6036dfb..7b1f1f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
@@ -61,7 +63,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 		for (long start = lastStart;
 			start > timestamp - size;
 			start -= slide) {
-			windows.add(new TimeWindow(start, size));
+			windows.add(new TimeWindow(start, start + size));
 		}
 		return windows;
 	}
@@ -99,4 +101,9 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
 		return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
 	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index d57dc33..aa019e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
@@ -53,7 +55,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
 		long start = timestamp - (timestamp % size);
-		return Collections.singletonList(new TimeWindow(start, size));
+		return Collections.singletonList(new TimeWindow(start, start + size));
 	}
 
 	public long getSize() {
@@ -85,4 +87,8 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 		return new TumblingTimeWindows(size.toMilliseconds());
 	}
 
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index d0b1ed0..4b4b1ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -52,4 +54,10 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
 	 * Returns the default trigger associated with this {@code WindowAssigner}.
 	 */
 	public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
+
+	/**
+	 * Returns a {@link TypeSerializer} for serializing windows that are assigned by
+	 * this {@code WindowAssigner}.
+	 */
+	public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index f23f6ee..3ea60f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -30,27 +31,29 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long interval;
-
-	private long nextFireTimestamp = 0;
+	private final long interval;
 
 	private ContinuousProcessingTimeTrigger(long interval) {
 		this.interval = interval;
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
 		long currentTime = System.currentTimeMillis();
+
+		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
+		long nextFireTimestamp = fireState.value();
+
 		if (nextFireTimestamp == 0) {
 			long start = currentTime - (currentTime % interval);
-			nextFireTimestamp = start + interval;
+			fireState.update(start + interval);
 
 			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 			return TriggerResult.CONTINUE;
 		}
 		if (currentTime > nextFireTimestamp) {
 			long start = currentTime - (currentTime % interval);
-			nextFireTimestamp = start + interval;
+			fireState.update(start + interval);
 
 			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 
@@ -60,22 +63,21 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
+	public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
+
+		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
+		long nextFireTimestamp = fireState.value();
+
 		// only fire if an element didn't already fire
 		long currentTime = System.currentTimeMillis();
 		if (currentTime > nextFireTimestamp) {
 			long start = currentTime - (currentTime % interval);
-			nextFireTimestamp = start + interval;
+			fireState.update(start + interval);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
 	}
 
-	@Override
-	public Trigger<Object, W> duplicate() {
-		return new ContinuousProcessingTimeTrigger<>(interval);
-	}
-
 	@VisibleForTesting
 	public long getInterval() {
 		return interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index 02ea81d..494ba3a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -32,22 +33,24 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long interval;
-
-	private boolean first = true;
+	private final long interval;
 
 	private ContinuousWatermarkTrigger(long interval) {
 		this.interval = interval;
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
-		if (first) {
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
+
+		OperatorState<Boolean> first = ctx.getKeyValueState("first", true);
+
+		if (first.value()) {
 			long start = timestamp - (timestamp % interval);
 			long nextFireTimestamp = start + interval;
 
 			ctx.registerWatermarkTimer(nextFireTimestamp);
-			first = false;
+
+			first.update(false);
 			return TriggerResult.CONTINUE;
 		}
 		return TriggerResult.CONTINUE;
@@ -60,11 +63,6 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
 	}
 
 	@Override
-	public Trigger<Object, W> duplicate() {
-		return new ContinuousWatermarkTrigger<>(interval);
-	}
-
-	@Override
 	public String toString() {
 		return "ContinuousProcessingTimeTrigger(" + interval + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 53480fe..57582f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -17,8 +17,11 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.io.IOException;
+
 /**
  * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
  *
@@ -27,19 +30,19 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long maxCount;
-	private long count;
+	private final long maxCount;
 
 	private CountTrigger(long maxCount) {
 		this.maxCount = maxCount;
-		count = 0;
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
-		count++;
-		if (count >= maxCount) {
-			count = 0;
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
+		OperatorState<Long> count = ctx.getKeyValueState("count", 0L);
+		long currentCount = count.value() + 1;
+		count.update(currentCount);
+		if (currentCount >= maxCount) {
+			count.update(0L);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
@@ -51,11 +54,6 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	}
 
 	@Override
-	public Trigger<Object, W> duplicate() {
-		return new CountTrigger<>(maxCount);
-	}
-
-	@Override
 	public String toString() {
 		return "CountTrigger(" +  maxCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index cf4cf0c..b1283f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -17,9 +17,12 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.io.Serializable;
+
 /**
  * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
  *
@@ -30,12 +33,11 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  *
  * @param <W> The type of {@link Window Windows} on which this trigger can operate.
  */
-public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+public class DeltaTrigger<T extends Serializable, W extends Window> implements Trigger<T, W> {
 	private static final long serialVersionUID = 1L;
 
-	DeltaFunction<T> deltaFunction;
-	private double threshold;
-	private transient T lastElement;
+	private final DeltaFunction<T> deltaFunction;
+	private final double threshold;
 
 	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
 		this.deltaFunction = deltaFunction;
@@ -43,13 +45,14 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
-		if (lastElement == null) {
-			lastElement = element;
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
+		OperatorState<T> lastElementState = ctx.getKeyValueState("last-element", null);
+		if (lastElementState.value() == null) {
+			lastElementState.update(element);
 			return TriggerResult.CONTINUE;
 		}
-		if (deltaFunction.getDelta(lastElement, element) > this.threshold) {
-			lastElement = element;
+		if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
+			lastElementState.update(element);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
@@ -61,11 +64,6 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public Trigger<T, W> duplicate() {
-		return new DeltaTrigger<>(threshold, deltaFunction);
-	}
-
-	@Override
 	public String toString() {
 		return "DeltaTrigger(" +  deltaFunction + ", " + threshold + ")";
 	}
@@ -78,9 +76,8 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	 *
 	 * @param <T> The type of elements on which this trigger can operate.
 	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 * @return
 	 */
-	public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
+	public static <T extends Serializable, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
 		return new DeltaTrigger<>(threshold, deltaFunction);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index cc3440c..70c57ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -26,16 +26,11 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
-	boolean isFirst = true;
-
 	private ProcessingTimeTrigger() {}
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
-		if (isFirst) {
-			ctx.registerProcessingTimeTimer(window.getEnd());
-			isFirst = false;
-		}
+		ctx.registerProcessingTimeTimer(window.maxTimestamp());
 		return TriggerResult.CONTINUE;
 	}
 
@@ -45,11 +40,6 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public Trigger<Object, TimeWindow> duplicate() {
-		return new ProcessingTimeTrigger();
-	}
-
-	@Override
 	public String toString() {
 		return "ProcessingTimeTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 1c896a7..76e36b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -40,7 +40,7 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
 		switch (triggerResult) {
 			case FIRE:
@@ -53,7 +53,7 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public TriggerResult onTime(long time, TriggerContext ctx) {
+	public TriggerResult onTime(long time, TriggerContext ctx) throws Exception {
 		TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
 		switch (triggerResult) {
 			case FIRE:
@@ -66,11 +66,6 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	}
 
 	@Override
-	public Trigger<T, W> duplicate() {
-		return new PurgingTrigger<>(nestedTrigger.duplicate());
-	}
-
-	@Override
 	public String toString() {
 		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index f9e2e3e..56b8687 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+
 import java.io.Serializable;
 
 /**
@@ -31,6 +33,11 @@ import java.io.Serializable;
  * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
  * have their own instance of the {@code Trigger}.
  *
+ * <p>
+ * Triggers must not maintain state internally since they can be re-created or reused for
+ * different keys. All necessary state should be persisted using the state abstraction
+ * available on the {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}.
+ *
  * @param <T> The type of elements on which this {@code Trigger} works.
  * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
  */
@@ -45,7 +52,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param window The window to which this pane belongs.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
+	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Called when a timer that was set using the trigger context fires.
@@ -53,13 +60,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * @param time The timestamp at which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
-	TriggerResult onTime(long time, TriggerContext ctx);
-
-	/**
-	 * Creates a duplicate of the {@code Trigger} without the state of the original {@code Trigger}.
-	 * @return The duplicate {@code Trigger} object.
-	 */
-	Trigger<T, W> duplicate();
+	TriggerResult onTime(long time, TriggerContext ctx) throws Exception;
 
 	/**
 	 * Result type for trigger methods. This determines what happens which the window.
@@ -75,7 +76,7 @@ public interface Trigger<T, W extends Window> extends Serializable {
 
 	/**
 	 * A context object that is given to {@code Trigger} methods to allow them to register timer
-	 * callbacks.
+	 * callbacks and deal with state.
 	 */
 	interface TriggerContext {
 
@@ -96,5 +97,15 @@ public interface Trigger<T, W extends Window> extends Serializable {
 		 * @param time The watermark at which to invoke {@link #onTime(long, TriggerContext)}
 		 */
 		void registerWatermarkTimer(long time);
+
+		/**
+		 * Retrieves an {@link OperatorState} object that can be used to interact with
+		 * fault-tolerant state that is scoped to the window and key of the current
+		 * trigger invocation.
+		 *
+		 * @param name A unique key for the state.
+		 * @param defaultState The default value of the state.
+		 */
+		<S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
index 5d66ba3..d17066b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -28,16 +28,11 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
-	boolean isFirst = true;
-
 	private WatermarkTrigger() {}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
-		if (isFirst) {
-			ctx.registerWatermarkTimer(window.maxTimestamp());
-			isFirst = false;
-		}
+	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
+		ctx.registerWatermarkTimer(window.maxTimestamp());
 		return TriggerResult.CONTINUE;
 	}
 
@@ -47,11 +42,6 @@ public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
 	}
 
 	@Override
-	public Trigger<Object, TimeWindow> duplicate() {
-		return new WatermarkTrigger();
-	}
-
-	@Override
 	public String toString() {
 		return "WatermarkTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index e0df19d..f20c779 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -17,6 +17,12 @@
  */
 package org.apache.flink.streaming.api.windowing.windows;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
 public class GlobalWindow extends Window {
 
 	private static GlobalWindow INSTANCE = new GlobalWindow();
@@ -28,29 +34,13 @@ public class GlobalWindow extends Window {
 	}
 
 	@Override
-	public long getStart() {
-		return Long.MIN_VALUE;
-	}
-
-	@Override
-	public long getEnd() {
-		return Long.MAX_VALUE;
-	}
-
-	@Override
 	public long maxTimestamp() {
 		return Long.MAX_VALUE;
 	}
 
 	@Override
 	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		return true;
+		return this == o || !(o == null || getClass() != o.getClass());
 	}
 
 	@Override
@@ -62,4 +52,77 @@ public class GlobalWindow extends Window {
 	public String toString() {
 		return "GlobalWindow";
 	}
+
+	public static class Serializer extends TypeSerializer<GlobalWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<GlobalWindow> duplicate() {
+			return this;
+		}
+
+		@Override
+		public GlobalWindow createInstance() {
+			return GlobalWindow.INSTANCE;
+		}
+
+		@Override
+		public GlobalWindow copy(GlobalWindow from) {
+			return from;
+		}
+
+		@Override
+		public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
+			target.writeByte(0);
+		}
+
+		@Override
+		public GlobalWindow deserialize(DataInputView source) throws IOException {
+			source.readByte();
+			return GlobalWindow.INSTANCE;
+		}
+
+		@Override
+		public GlobalWindow deserialize(GlobalWindow reuse,
+				DataInputView source) throws IOException {
+			source.readByte();
+			return GlobalWindow.INSTANCE;
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			source.readByte();
+			target.writeByte(0);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 20080c0..0c4c2a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -17,31 +17,37 @@
  */
 package org.apache.flink.streaming.api.windowing.windows;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * A {@link Window} that represents a time interval from {@code start} (inclusive) to
+ * {@code start + size} (exclusive).
+ */
 public class TimeWindow extends Window {
-	long start;
-	long end;
 
-	public TimeWindow() {
-	}
+	private final long start;
+	private final long end;
 
-	public TimeWindow(long start, long size) {
+	public TimeWindow(long start, long end) {
 		this.start = start;
-		this.end = start + size - 1;
+		this.end = end;
 	}
 
-	@Override
 	public long getStart() {
 		return start;
 	}
 
-	@Override
 	public long getEnd() {
 		return end;
 	}
 
 	@Override
 	public long maxTimestamp() {
-		return end;
+		return end - 1;
 	}
 
 	@Override
@@ -72,4 +78,80 @@ public class TimeWindow extends Window {
 				", end=" + end +
 				'}';
 	}
+
+	public static class Serializer extends TypeSerializer<TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<TimeWindow> duplicate() {
+			return this;
+		}
+
+		@Override
+		public TimeWindow createInstance() {
+			return null;
+		}
+
+		@Override
+		public TimeWindow copy(TimeWindow from) {
+			return from;
+		}
+
+		@Override
+		public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(TimeWindow record, DataOutputView target) throws IOException {
+			target.writeLong(record.start);
+			target.writeLong(record.end);
+		}
+
+		@Override
+		public TimeWindow deserialize(DataInputView source) throws IOException {
+			long start = source.readLong();
+			long end = source.readLong();
+			return new TimeWindow(start, end);
+		}
+
+		@Override
+		public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
+			long start = source.readLong();
+			long end = source.readLong();
+			return new TimeWindow(start, end);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeLong(source.readLong());
+			target.writeLong(source.readLong());
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof Serializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
index 4e22c32..2e415f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -17,11 +17,15 @@
  */
 package org.apache.flink.streaming.api.windowing.windows;
 
+/**
+ * A {@code Window} is a grouping of elements into finite buckets. Windows have a maximum timestamp
+ * which means that, at some point, all elements that go into one window will have arrived.
+ *
+ * <p>
+ * Subclasses should implement {@code equals()} and {@code hashCode()} so that logically
+ * same windows are treated the same.
+ */
 public abstract class Window {
 
-	public abstract long getStart();
-
-	public abstract long getEnd();
-
 	public abstract long maxTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 9964760..3165f88 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -239,7 +239,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	private void computeWindow(long timestamp) throws Exception {
 		out.setTimestamp(timestamp);
 		panes.truncatePanes(numPanesPerWindow);
-		panes.evaluateWindow(out, new TimeWindow(timestamp, windowSize));
+		panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize));
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index bd3572e..1bb451a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -18,17 +18,14 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -45,51 +42,35 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(EvictingNonKeyedWindowOperator.class);
-
 	private final Evictor<? super IN, ? super W> evictor;
 
 	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
 			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, windowBufferFactory, windowFunction, trigger);
+		super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
 		this.evictor = requireNonNull(evictor);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(W window, boolean purge) throws Exception {
-
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = windows.remove(window);
-		} else {
-			bufferAndTrigger = windows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} already gone.", window);
-			return;
-		}
-
-
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
+		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
 
 		int toEvict = 0;
 		if (windowBuffer.size() > 0) {
 			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
 		}
 
 		windowBuffer.removeElements(toEvict);
 
 		userFunction.apply(
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 51413bd..ad43812 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -18,20 +18,15 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
@@ -51,64 +46,38 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(EvictingWindowOperator.class);
-
 	private final Evictor<? super IN, ? super W> evictor;
 
 	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
 			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
 			WindowFunction<IN, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
+		super(windowAssigner, windowSerializer, keySelector, keySerializer, windowBufferFactory, windowFunction, trigger);
 		this.evictor = requireNonNull(evictor);
 	}
 
 	@Override
 	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(K key, W window, boolean purge) throws Exception {
-
-		timestampedCollector.setTimestamp(window.getEnd());
-
-		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
-
-		if (keyWindows == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
-
-		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-		if (purge) {
-			bufferAndTrigger = keyWindows.remove(window);
-		} else {
-			bufferAndTrigger = keyWindows.get(window);
-		}
-
-		if (bufferAndTrigger == null) {
-			LOG.debug("Window {} for key {} already gone.", window, key);
-			return;
-		}
-
-
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+	protected void emitWindow(Context context) throws Exception {
+		timestampedCollector.setTimestamp(context.window.maxTimestamp());
+		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
 
 		int toEvict = 0;
 		if (windowBuffer.size() > 0) {
 			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
 		}
 
 		windowBuffer.removeElements(toEvict);
 
-		userFunction.apply(key,
-				window,
-				bufferAndTrigger.f0.getUnpackedElements(),
+		userFunction.apply(context.key,
+				context.window,
+				context.windowBuffer.getUnpackedElements(),
 				timestampedCollector);
-
-		if (keyWindows.isEmpty()) {
-			windows.remove(key);
-		}
 	}
 
 	@Override